Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 978067AD2 for ; Thu, 3 Nov 2011 21:44:50 +0000 (UTC) Received: (qmail 4780 invoked by uid 500); 3 Nov 2011 21:44:50 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 4747 invoked by uid 500); 3 Nov 2011 21:44:49 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 4740 invoked by uid 99); 3 Nov 2011 21:44:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Nov 2011 21:44:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Nov 2011 21:44:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8CA1E23889FA for ; Thu, 3 Nov 2011 21:44:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1197318 - in /pig/branches/branch-0.10: ./ src/org/apache/pig/builtin/ test/e2e/pig/tests/ test/org/apache/pig/test/ Date: Thu, 03 Nov 2011 21:44:26 -0000 To: commits@pig.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111103214426.8CA1E23889FA@eris.apache.org> Author: gates Date: Thu Nov 3 21:44:25 2011 New Revision: 1197318 URL: http://svn.apache.org/viewvc?rev=1197318&view=rev Log: PIG-2328: Add builtin UDFs for building and using bloom filters Added: pig/branches/branch-0.10/src/org/apache/pig/builtin/Bloom.java pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloom.java pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloomBase.java pig/branches/branch-0.10/test/org/apache/pig/test/TestBloom.java Modified: pig/branches/branch-0.10/CHANGES.txt pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf Modified: pig/branches/branch-0.10/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1197318&r1=1197317&r2=1197318&view=diff ============================================================================== --- pig/branches/branch-0.10/CHANGES.txt (original) +++ pig/branches/branch-0.10/CHANGES.txt Thu Nov 3 21:44:25 2011 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-2328: Add builtin UDFs for building and using bloom filters (gates) + PIG-2334: Set default number of reducers for S3N filesystem (ddaniels888 via daijy) PIG-1387: Syntactical Sugar for PIG-1385 (azaroth) Added: pig/branches/branch-0.10/src/org/apache/pig/builtin/Bloom.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/builtin/Bloom.java?rev=1197318&view=auto ============================================================================== --- pig/branches/branch-0.10/src/org/apache/pig/builtin/Bloom.java (added) +++ pig/branches/branch-0.10/src/org/apache/pig/builtin/Bloom.java Thu Nov 3 21:44:25 2011 @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.builtin; + + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.Key; + +import org.apache.pig.FilterFunc; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +/** + * Use a Bloom filter build previously by BuildBloom. You would first + * build a bloom filter in a group all job. For example: + * in a group all job. For example: + * define bb BuildBloom('jenkins', '100', '0.1'); + * A = load 'foo' as (x, y); + * B = group A all; + * C = foreach B generate bb(A.x); + * store C into 'mybloom'; + * The bloom filter can be on multiple keys by passing more than one field + * (or the entire bag) to BuildBloom. + * The resulting file can then be used in a Bloom filter as: + * define bloom Bloom(mybloom); + * A = load 'foo' as (x, y); + * B = load 'bar' as (z); + * C = filter B by bloom(z); + * D = join C by z, A by x; + * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}. + */ +public class Bloom extends FilterFunc { + + private String bloomFile; + public BloomFilter filter = null; + + /** + * @param filename file containing the serialized Bloom filter + */ + public Bloom(String filename) { + bloomFile = filename; + } + + @Override + public Boolean exec(Tuple input) throws IOException { + if (filter == null) { + init(); + } + byte[] b; + if (input.size() == 1) b = DataType.toBytes(input.get(0)); + else b = DataType.toBytes(input, DataType.TUPLE); + + Key k = new Key(b); + return filter.membershipTest(k); + } + + @Override + public List getCacheFiles() { + List list = new ArrayList(1); + // We were passed the name of the file on HDFS. Append a + // name for the file on the task node. + try { + list.add(bloomFile + "#" + getFilenameFromPath(bloomFile)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return list; + } + + private void init() throws IOException { + filter = new BloomFilter(); + String dcFile = "./" + getFilenameFromPath(bloomFile) + + "/part-r-00000"; + filter.readFields(new DataInputStream(new FileInputStream(dcFile))); + } + + /** + * For testing only, do not use directly. + */ + public void setFilter(DataByteArray dba) throws IOException { + DataInputStream dis = new DataInputStream(new + ByteArrayInputStream(dba.get())); + filter = new BloomFilter(); + filter.readFields(dis); + } + + private String getFilenameFromPath(String p) throws IOException { + return p.replace("/", "_"); + } + +} Added: pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloom.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloom.java?rev=1197318&view=auto ============================================================================== --- pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloom.java (added) +++ pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloom.java Thu Nov 3 21:44:25 2011 @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.builtin; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.Key; + +import org.apache.pig.Algebraic; +import org.apache.pig.EvalFunc; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +/** + * Build a bloom filter for use later in Bloom. This UDF is intended to run + * in a group all job. For example: + * define bb BuildBloom('jenkins', '100', '0.1'); + * A = load 'foo' as (x, y); + * B = group A all; + * C = foreach B generate BuildBloom(A.x); + * store C into 'mybloom'; + * The bloom filter can be on multiple keys by passing more than one field + * (or the entire bag) to BuildBloom. + * The resulting file can then be used in a Bloom filter as: + * define bloom Bloom(mybloom); + * A = load 'foo' as (x, y); + * B = load 'bar' as (z); + * C = filter B by Bloom(z); + * D = join C by z, A by x; + * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}. + */ +public class BuildBloom extends BuildBloomBase implements Algebraic { + + /** + * Build a bloom filter of fixed size and number of hash functions. + * @param hashType type of the hashing function (see + * {@link org.apache.hadoop.util.bloom.Hash}). + * @param mode Will be ignored, though by convention it should be + * "fixed" or "fixedsize" + * @param vectorSize The vector size of this filter. + * @param nbHash The number of hash functions to consider. + */ + public BuildBloom(String hashType, + String mode, + String vectorSize, + String nbHash) { + super(hashType, mode, vectorSize, nbHash); + } + + /** + * Construct a Bloom filter based on expected number of elements and + * desired accuracy. + * @param hashType type of the hashing function (see + * {@link org.apache.hadoop.util.bloom.Hash}). + * @param numElements The number of distinct elements expected to be + * placed in this filter. + * @param desiredFalsePositive the acceptable rate of false positives. + * This should be a floating point value between 0 and 1.0, where 1.0 + * would be 100% (ie, a totally useless filter). + */ + public BuildBloom(String hashType, + String numElements, + String desiredFalsePositive) { + super(hashType, numElements, desiredFalsePositive); + } + + @Override + public DataByteArray exec(Tuple input) throws IOException { + throw new IOException("This must be used with algebraic!"); + } + + public String getInitial() { + return Initial.class.getName(); + } + + public String getIntermed() { + return Intermediate.class.getName(); + } + + public String getFinal() { + return Final.class.getName(); + } + + static public class Initial extends BuildBloomBase { + + public Initial() { + } + + public Initial(String hashType, + String mode, + String vectorSize, + String nbHash ) { + super(hashType, mode, vectorSize, nbHash); + } + + public Initial(String hashType, + String numElements, + String desiredFalsePositive) { + super(hashType, numElements, desiredFalsePositive); + } + + @Override + public Tuple exec(Tuple input) throws IOException { + if (input == null || input.size() == 0) return null; + + // Strip off the initial level of bag + DataBag values = (DataBag)input.get(0); + Iterator it = values.iterator(); + Tuple t = it.next(); + + // If the input tuple has only one field, then we'll extract + // that field and serialize it into a key. If it has multiple + // fields, we'll serialize the whole tuple. + byte[] b; + if (t.size() == 1) b = DataType.toBytes(t.get(0)); + else b = DataType.toBytes(t, DataType.TUPLE); + + Key k = new Key(b); + filter = new BloomFilter(vSize, numHash, hType); + filter.add(k); + + return TupleFactory.getInstance().newTuple(bloomOut()); + } + } + + static public class Intermediate extends BuildBloomBase { + + public Intermediate() { + } + + public Intermediate(String hashType, + String mode, + String vectorSize, + String nbHash ) { + super(hashType, mode, vectorSize, nbHash); + } + + public Intermediate(String hashType, + String numElements, + String desiredFalsePositive) { + super(hashType, numElements, desiredFalsePositive); + } + + + @Override + public Tuple exec(Tuple input) throws IOException { + return TupleFactory.getInstance().newTuple(bloomOr(input)); + } + } + + static public class Final extends BuildBloomBase { + + public Final() { + } + + public Final(String hashType, + String mode, + String vectorSize, + String nbHash ) { + super(hashType, mode, vectorSize, nbHash); + } + + public Final(String hashType, + String numElements, + String desiredFalsePositive) { + super(hashType, numElements, desiredFalsePositive); + } + + @Override + public DataByteArray exec(Tuple input) throws IOException { + return bloomOr(input); + } + } + + @Override + public Schema outputSchema(Schema input) { + return new Schema(new Schema.FieldSchema(null, DataType.BYTEARRAY)); + } + +} Added: pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloomBase.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloomBase.java?rev=1197318&view=auto ============================================================================== --- pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloomBase.java (added) +++ pig/branches/branch-0.10/src/org/apache/pig/builtin/BuildBloomBase.java Thu Nov 3 21:44:25 2011 @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.builtin; + +import java.io.IOException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.util.Iterator; + +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.hash.Hash; + +import org.apache.pig.EvalFunc; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.Tuple; + +/** + * A Base class for BuildBloom and its Algebraic implementations. + */ +public abstract class BuildBloomBase extends EvalFunc { + + protected int vSize; + protected int numHash; + protected int hType; + protected BloomFilter filter; + + protected BuildBloomBase() { + } + + /** + * @param hashType type of the hashing function (see + * {@link org.apache.hadoop.util.bloom.Hash}). + * @param mode Will be ignored, though by convention it should be + * "fixed" or "fixedsize" + * @param vectorSize The vector size of this filter. + * @param nbHash The number of hash functions to consider. + */ + public BuildBloomBase(String hashType, + String mode, + String vectorSize, + String nbHash) { + vSize = Integer.valueOf(vectorSize); + numHash = Integer.valueOf(nbHash); + hType = convertHashType(hashType); + } + + /** + * @param hashType type of the hashing function (see + * {@link org.apache.hadoop.util.bloom.Hash}). + * @param numElements The number of distinct elements expected to be + * placed in this filter. + * @param desiredFalsePositive the acceptable rate of false positives. + * This should be a floating point value between 0 and 1.0, where 1.0 + * would be 100% (ie, a totally useless filter). + */ + public BuildBloomBase(String hashType, + String numElements, + String desiredFalsePositive) { + setSize(numElements, desiredFalsePositive); + hType = convertHashType(hashType); + } + + + protected DataByteArray bloomOr(Tuple input) throws IOException { + filter = new BloomFilter(vSize, numHash, hType); + + try { + DataBag values = (DataBag)input.get(0); + for (Iterator it = values.iterator(); it.hasNext();) { + Tuple t = it.next(); + filter.or(bloomIn((DataByteArray)t.get(0))); + } + } catch (ExecException ee) { + throw new IOException(ee); + } + + return bloomOut(); + } + + protected DataByteArray bloomOut() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(vSize / 8); + DataOutputStream dos = new DataOutputStream(baos); + filter.write(dos); + return new DataByteArray(baos.toByteArray()); + } + + protected BloomFilter bloomIn(DataByteArray b) throws IOException { + DataInputStream dis = new DataInputStream(new + ByteArrayInputStream(b.get())); + BloomFilter f = new BloomFilter(); + f.readFields(dis); + return f; + } + + private int convertHashType(String hashType) { + if (hashType.toLowerCase().contains("jenkins")) { + return Hash.JENKINS_HASH; + } else if (hashType.toLowerCase().contains("murmur")) { + return Hash.MURMUR_HASH; + } else { + throw new RuntimeException("Unknown hash type " + hashType + + ". Valid values are jenkins and murmur."); + } + } + + private void setSize(String numElements, String desiredFalsePositive) { + int num = Integer.valueOf(numElements); + float fp = Float.valueOf(desiredFalsePositive); + if (num < 1 || fp < 0.0 || fp >= 1.0) { + throw new RuntimeException("Number of elements must be greater " + + "than zero and desiredFalsePositive must be between 0 " + + " and 1."); + } + vSize = (int)(-1 * (num * Math.log(fp)) / Math.pow(Math.log(2), 2)); + log.info("BuildBloom setting vector size to " + vSize); + + numHash = (int)(0.7 * vSize / num); + log.info("BuildBloom setting number of hashes to " + numHash); + } + + +} Modified: pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf?rev=1197318&r1=1197317&r2=1197318&view=diff ============================================================================== --- pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf (original) +++ pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf Thu Nov 3 21:44:25 2011 @@ -3882,7 +3882,78 @@ store E into ':OUTPATH:';\, store D into ':OUTPATH:';?, } ], - }, + },{ + 'name' => 'Bloom', + 'execonly' => 'mapred', # distributed cache does not work in local mode + 'tests' => [ + { + 'num' => 1, + 'pig' => "define bb BuildBloom('Hash.JENKINS_HASH', 'fixed', '128', '3'); + fs -rmr :TMP:/mybloom_1; + A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + B = filter A by name == 'alice allen'; + C = group B all; + D = foreach C generate bb(B.name); + store D into ':TMP:/mybloom_1'; + exec; + define bloom Bloom(':TMP:/mybloom_1'); + E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + F = filter E by bloom(name); + store F into ':OUTPATH:';", + 'notmq' => 1, + 'verify_pig_script' => " + A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double); + B = filter A by name == 'alice allen'; + store B into ':OUTPATH:';", + }, { + 'num' => 2, + 'pig' => "define bb BuildBloom('Hash.MURMUR_HASH', 'fixed', '128', '3'); + fs -rmr :TMP:/mybloom_2; + A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + B = filter A by name == 'alice allen'; + C = group B all; + D = foreach C generate bb(B.name); + store D into ':TMP:/mybloom_2'; + exec; + define bloom Bloom(':TMP:/mybloom_2'); + E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + F = filter E by bloom(name); + G = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float); + H = join F by name, G by name; + store H into ':OUTPATH:';", + 'notmq' => 1, + 'verify_pig_script' => " + A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double); + B = filter A by name == 'alice allen'; + C = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float); + D = join B by name, C by name; + store D into ':OUTPATH:';", + },{ + 'num' => 3, + 'pig' => "define bb BuildBloom('Hash.JENKINS_HASH', '1', '0.0001'); + fs -rmr :TMP:/mybloom_3; + A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + B = filter A by name == 'alice allen'; + C = group B all; + D = foreach C generate bb(B.name); + store D into ':TMP:/mybloom_3'; + exec; + define bloom Bloom(':TMP:/mybloom_3'); + E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + F = filter E by bloom(name); + G = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float); + H = join G by name, F by name using 'repl'; + store H into ':OUTPATH:';", + 'notmq' => 1, + 'verify_pig_script' => " + A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double); + B = filter A by name == 'alice allen'; + C = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float); + D = join C by name, B by name; + store D into ':OUTPATH:';", + } + ], + } ], }, ; Added: pig/branches/branch-0.10/test/org/apache/pig/test/TestBloom.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/TestBloom.java?rev=1197318&view=auto ============================================================================== --- pig/branches/branch-0.10/test/org/apache/pig/test/TestBloom.java (added) +++ pig/branches/branch-0.10/test/org/apache/pig/test/TestBloom.java Thu Nov 3 21:44:25 2011 @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import org.junit.Test; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.builtin.Bloom; +import org.apache.pig.builtin.BuildBloom; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; + +/** + * This class unit tests the built in UDFs BuildBloom and Bloom. + */ +public class TestBloom extends junit.framework.TestCase { + + static class TestBuildBloom extends BuildBloom { + + TestBuildBloom(String numElements, String desiredFalsePositive) { + super("jenkins", numElements, desiredFalsePositive); + } + + int getSize() { return vSize; } + int getNumHash() { return numHash; } + + } + + @Test + public void testSizeCalc() throws Exception { + + TestBuildBloom tbb = new TestBuildBloom("1000", "0.01"); + assertEquals(9585, tbb.getSize()); + assertEquals(6, tbb.getNumHash()); + tbb = new TestBuildBloom("1000000", "0.01"); + assertEquals(9585058 , tbb.getSize()); + assertEquals(6, tbb.getNumHash()); + tbb = new TestBuildBloom("1000", "0.0001"); + assertEquals(19170, tbb.getSize()); + assertEquals(13, tbb.getNumHash()); + tbb = new TestBuildBloom("1000000", "0.00001"); + assertEquals(23962645, tbb.getSize()); + assertEquals(16, tbb.getNumHash()); + } + + @Test + public void testBadHash() throws Exception { + String size = "100"; + String numHash = "3"; + String hashFunc = "nosuchhash"; + boolean caughtException = false; + try { + BuildBloom bb = new BuildBloom(hashFunc, "fixed", size, numHash); + } catch (RuntimeException re) { + assertTrue(re.getMessage().contains("Unknown hash type")); + caughtException = true; + } + assertTrue(caughtException); + } + + @Test + public void testFuncNames() throws Exception { + String size = "100"; + String numHash = "3"; + String hashFunc = "JENKINS_HASH"; + BuildBloom bb = new BuildBloom(hashFunc, "fixed", size, numHash); + assertEquals("org.apache.pig.builtin.BuildBloom$Initial", + bb.getInitial()); + assertEquals("org.apache.pig.builtin.BuildBloom$Intermediate", + bb.getIntermed()); + assertEquals("org.apache.pig.builtin.BuildBloom$Final", + bb.getFinal()); + } + + @Test + public void testMap() throws Exception { + String size = "100"; + String numHash = "3"; + String hashFunc = "JENKINS"; + TupleFactory tf = TupleFactory.getInstance(); + BagFactory bf = BagFactory.getInstance(); + + Tuple t = tf.newTuple(1); + t.set(0, 1); + DataBag b = bf.newDefaultBag(); + b.add(t); + Tuple input = tf.newTuple(b); + + BuildBloom.Initial map = + new BuildBloom.Initial(hashFunc, "fixed", size, numHash); + t = map.exec(input); + + Bloom bloom = new Bloom("bla"); + bloom.setFilter((DataByteArray)t.get(0)); + + // Test that everything we put in passes. + Tuple t1 = tf.newTuple(1); + t1.set(0, 1); + assertTrue(bloom.exec(t1)); + + // A few that don't pass + for (int i = 100; i < 10; i++) { + Tuple t2 = tf.newTuple(1); + t2.set(0, i); + assertFalse(bloom.exec(t2)); + } + } + + @Test + public void testCombiner() throws Exception { + String size = "100"; + String numHash = "3"; + String hashFunc = "jenkins"; + TupleFactory tf = TupleFactory.getInstance(); + BagFactory bf = BagFactory.getInstance(); + + DataBag combinerBag = bf.newDefaultBag(); + for (int j = 0; j < 3; j++) { // map loop + Tuple t = tf.newTuple(1); + t.set(0, 10 + j); + DataBag mapBag = bf.newDefaultBag(); + mapBag.add(t); + Tuple input = tf.newTuple(mapBag); + BuildBloom.Initial map = + new BuildBloom.Initial(hashFunc, "fixed", size, numHash); + combinerBag.add(map.exec(input)); + } + Tuple t = tf.newTuple(1); + t.set(0, combinerBag); + BuildBloom.Intermediate combiner = + new BuildBloom.Intermediate(hashFunc, "fixed", size, numHash); + t = combiner.exec(t); + + Bloom bloom = new Bloom("bla"); + bloom.setFilter((DataByteArray)t.get(0)); + + // Test that everything we put in passes. + for (int j = 0; j < 3; j++) { + Tuple t1 = tf.newTuple(1); + t1.set(0, 10 + j); + assertTrue(bloom.exec(t1)); + } + + // A few that don't pass + for (int i = 100; i < 10; i++) { + Tuple t2 = tf.newTuple(1); + t2.set(0, i); + assertFalse(bloom.exec(t2)); + } + } + + @Test + public void testSingleKey() throws Exception { + String size = "100"; + String numHash = "3"; + String hashFunc = "MURMUR"; + TupleFactory tf = TupleFactory.getInstance(); + BagFactory bf = BagFactory.getInstance(); + + DataBag reducerBag = bf.newDefaultBag(); + for (int i = 0; i < 3; i++) { // combiner loop + DataBag combinerBag = bf.newDefaultBag(); + for (int j = 0; j < 3; j++) { // map loop + Tuple t = tf.newTuple(1); + t.set(0, i * 10 + j); + DataBag mapBag = bf.newDefaultBag(); + mapBag.add(t); + Tuple input = tf.newTuple(mapBag); + BuildBloom.Initial map = + new BuildBloom.Initial(hashFunc, "fixed", size, numHash); + combinerBag.add(map.exec(input)); + } + Tuple t = tf.newTuple(1); + t.set(0, combinerBag); + BuildBloom.Intermediate combiner = + new BuildBloom.Intermediate(hashFunc, "fixed", size, numHash); + reducerBag.add(combiner.exec(t)); + } + + Tuple t = tf.newTuple(1); + t.set(0, reducerBag); + BuildBloom.Final reducer = + new BuildBloom.Final(hashFunc, "fixed", size, numHash); + DataByteArray dba = reducer.exec(t); + + Bloom bloom = new Bloom("bla"); + bloom.setFilter(dba); + + // Test that everything we put in passes. + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + Tuple t1 = tf.newTuple(1); + t1.set(0, i * 10 + j); + assertTrue(bloom.exec(t1)); + } + } + + // A few that don't pass + for (int i = 100; i < 10; i++) { + Tuple t1 = tf.newTuple(1); + t1.set(0, i); + assertFalse(bloom.exec(t1)); + } + } + + @Test + public void testMultiKey() throws Exception { + String numElements = "10"; + String falsePositive = "0.001"; + String hashFunc = "murmur"; + TupleFactory tf = TupleFactory.getInstance(); + BagFactory bf = BagFactory.getInstance(); + + String[][] strs = { + { "fred", "joe", "bob" }, + { "mary", "sally", "jane" }, + { "fido", "spot", "fluffly" }}; + + DataBag reducerBag = bf.newDefaultBag(); + for (int i = 0; i < 3; i++) { // combiner loop + DataBag combinerBag = bf.newDefaultBag(); + for (int j = 0; j < 3; j++) { // map loop + Tuple t = tf.newTuple(2); + t.set(0, i * 10 + j); + t.set(1, strs[i][j]); + DataBag mapBag = bf.newDefaultBag(); + mapBag.add(t); + Tuple input = tf.newTuple(mapBag); + BuildBloom.Initial map = + new BuildBloom.Initial(hashFunc, numElements, + falsePositive); + combinerBag.add(map.exec(input)); + } + Tuple t = tf.newTuple(1); + t.set(0, combinerBag); + BuildBloom.Intermediate combiner = + new BuildBloom.Intermediate(hashFunc, numElements, + falsePositive); + reducerBag.add(combiner.exec(t)); + } + + Tuple t = tf.newTuple(1); + t.set(0, reducerBag); + BuildBloom.Final reducer = + new BuildBloom.Final(hashFunc, numElements, falsePositive); + DataByteArray dba = reducer.exec(t); + + Bloom bloom = new Bloom("bla"); + bloom.setFilter(dba); + + // Test that everything we put in passes. + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + Tuple t1 = tf.newTuple(2); + t1.set(0, i * 10 + j); + t1.set(1, strs[i][j]); + assertTrue(bloom.exec(t1)); + } + } + + // A few that don't pass + for (int i = 100; i < 10; i++) { + Tuple t1 = tf.newTuple(2); + t1.set(0, i); + t1.set(1, "ichabod"); + assertFalse(bloom.exec(t1)); + } + }}