Return-Path: X-Original-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6BB97D50E for ; Sat, 14 Jul 2012 17:28:54 +0000 (UTC) Received: (qmail 11425 invoked by uid 500); 14 Jul 2012 17:28:54 -0000 Delivered-To: apmail-incubator-crunch-commits-archive@incubator.apache.org Received: (qmail 11311 invoked by uid 500); 14 Jul 2012 17:28:54 -0000 Mailing-List: contact crunch-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-commits@incubator.apache.org Received: (qmail 10910 invoked by uid 99); 14 Jul 2012 17:28:53 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 14 Jul 2012 17:28:53 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5D79714333; Sat, 14 Jul 2012 17:28:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: crunch-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [10/19] CRUNCH-17: Split out Crunch integration tests. Contributed by Matthias Friedrich. Message-Id: <20120714172853.5D79714333@tyr.zones.apache.org> Date: Sat, 14 Jul 2012 17:28:53 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java new file mode 100644 index 0000000..e2fa9a2 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PTableKeyValueIT; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.io.To; +import org.apache.crunch.test.FileHelper; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.writable.WritableTypeFamily; +import com.google.common.collect.Lists; + +@RunWith(value = Parameterized.class) +public class UnionCollectionIT { + + private static final Log LOG = LogFactory.getLog(UnionCollectionIT.class); + + private PTypeFamily typeFamily; + private Pipeline pipeline; + private PCollection union; + + private ArrayList EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e"); + + @Before + @SuppressWarnings("unchecked") + public void setUp() throws IOException { + String inputFile1 = FileHelper.createTempCopyOf("set1.txt"); + String inputFile2 = FileHelper.createTempCopyOf("set2.txt"); + + PCollection firstCollection = pipeline.read(At.textFile(inputFile1, + typeFamily.strings())); + PCollection secondCollection = pipeline.read(At.textFile(inputFile2, + typeFamily.strings())); + + LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : " + + typeFamily.getClass().getSimpleName() + "] First: " + + Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: " + + Lists.newArrayList(secondCollection.materialize().iterator())); + + union = secondCollection.union(firstCollection); + } + + @After + public void tearDown() { + pipeline.done(); + } + + @Parameters + public static Collection data() throws IOException { + Object[][] data = new Object[][] { + { WritableTypeFamily.getInstance(), new MRPipeline(PTableKeyValueIT.class) }, + { WritableTypeFamily.getInstance(), MemPipeline.getInstance() }, + { AvroTypeFamily.getInstance(), new MRPipeline(PTableKeyValueIT.class) }, + { AvroTypeFamily.getInstance(), MemPipeline.getInstance() } }; + return Arrays.asList(data); + } + + public UnionCollectionIT(PTypeFamily typeFamily, Pipeline pipeline) { + this.typeFamily = typeFamily; + this.pipeline = pipeline; + } + + @Test + public void unionMaterializeShouldNotThrowNPE() { + checkMaterialized(union.materialize()); + checkMaterialized(pipeline.materialize(union)); + } + + private void checkMaterialized(Iterable materialized) { + + List materializedValues = Lists.newArrayList(materialized.iterator()); + Collections.sort(materializedValues); + LOG.info("Materialized union: " + materializedValues); + + assertEquals(EXPECTED, materializedValues); + } + + @Test + public void unionWriteShouldNotThrowNPE() throws IOException { + + File outputPath1 = FileHelper.createOutputPath(); + File outputPath2 = FileHelper.createOutputPath(); + File outputPath3 = FileHelper.createOutputPath(); + + if (typeFamily == AvroTypeFamily.getInstance()) { + union.write(To.avroFile(outputPath1.getAbsolutePath())); + pipeline.write(union, To.avroFile(outputPath2.getAbsolutePath())); + + pipeline.run(); + + checkFileContents(outputPath1.getAbsolutePath()); + checkFileContents(outputPath2.getAbsolutePath()); + + } else { + + union.write(To.textFile(outputPath1.getAbsolutePath())); + pipeline.write(union, To.textFile(outputPath2.getAbsolutePath())); + pipeline.writeTextFile(union, outputPath3.getAbsolutePath()); + + pipeline.run(); + + checkFileContents(outputPath1.getAbsolutePath()); + checkFileContents(outputPath2.getAbsolutePath()); + checkFileContents(outputPath3.getAbsolutePath()); + } + + } + + private void checkFileContents(String filePath) throws IOException { + + List fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists + .newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize() + .iterator()) : Lists.newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings())) + .materialize().iterator()); + + Collections.sort(fileContentValues); + + LOG.info("Saved Union: " + fileContentValues); + assertEquals(EXPECTED, fileContentValues); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java new file mode 100644 index 0000000..df05aa5 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import org.apache.crunch.io.text.TextFileReaderFactory; +import org.apache.crunch.test.FileHelper; +import org.apache.crunch.types.writable.Writables; +import com.google.common.collect.Lists; +import com.google.common.io.Files; + + +public class CompositePathIterableIT { + + + @Test + public void testCreate_FilePresent() throws IOException{ + String inputFilePath = FileHelper.createTempCopyOf("set1.txt"); + Configuration conf = new Configuration(); + LocalFileSystem local = FileSystem.getLocal(conf); + + Iterable iterable = CompositePathIterable.create(local, new Path(inputFilePath), new TextFileReaderFactory(Writables.strings(), conf)); + + assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable)); + + } + + @Test + public void testCreate_DirectoryPresentButNoFiles() throws IOException{ + String inputFilePath = Files.createTempDir().getAbsolutePath(); + + Configuration conf = new Configuration(); + LocalFileSystem local = FileSystem.getLocal(conf); + + Iterable iterable = CompositePathIterable.create(local, new Path(inputFilePath), new TextFileReaderFactory(Writables.strings(), conf)); + + assertTrue(Lists.newArrayList(iterable).isEmpty()); + } + + @Test(expected=IOException.class) + public void testCreate_DirectoryNotPresent() throws IOException{ + File inputFileDir = Files.createTempDir(); + inputFileDir.delete(); + + // Sanity check + assertFalse(inputFileDir.exists()); + + Configuration conf = new Configuration(); + LocalFileSystem local = FileSystem.getLocal(conf); + + CompositePathIterable.create(local, new Path(inputFileDir.getAbsolutePath()), new TextFileReaderFactory(Writables.strings(), conf)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java new file mode 100644 index 0000000..e25aec4 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.avro; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.reflect.ReflectData; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.io.avro.AvroFileReaderFactoryTest.PojoPerson; +import org.apache.crunch.test.Person; +import org.apache.crunch.types.avro.Avros; +import com.google.common.collect.Lists; + +@SuppressWarnings("serial") +public class AvroFileSourceTargetIT implements Serializable { + + private transient File avroFile; + + @Before + public void setUp() throws IOException { + avroFile = File.createTempFile("test", ".avro"); + } + + @After + public void tearDown() { + avroFile.delete(); + } + + private void populateGenericFile(List genericRecords, + Schema schema) throws IOException { + FileOutputStream outputStream = new FileOutputStream(this.avroFile); + GenericDatumWriter genericDatumWriter = new GenericDatumWriter( + schema); + + DataFileWriter dataFileWriter = new DataFileWriter( + genericDatumWriter); + dataFileWriter.create(schema, outputStream); + + for (GenericRecord record : genericRecords) { + dataFileWriter.append(record); + } + + dataFileWriter.close(); + outputStream.close(); + + } + + @Test + public void testSpecific() throws IOException { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class); + PCollection genericCollection = pipeline.read(At.avroFile( + avroFile.getAbsolutePath(), Avros.records(Person.class))); + + List personList = Lists.newArrayList(genericCollection + .materialize()); + + Person expectedPerson = new Person(); + expectedPerson.setName("John Doe"); + expectedPerson.setAge(42); + + List siblingNames = Lists.newArrayList(); + siblingNames.add("Jimmy"); + siblingNames.add("Jane"); + expectedPerson.setSiblingnames(siblingNames); + + assertEquals(Lists.newArrayList(expectedPerson), + Lists.newArrayList(personList)); + } + + @Test + public void testGeneric() throws IOException { + String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", + "GenericPerson"); + Schema genericPersonSchema = new Schema.Parser() + .parse(genericSchemaJson); + GenericRecord savedRecord = new GenericData.Record(genericPersonSchema); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), + genericPersonSchema); + + Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class); + PCollection genericCollection = pipeline + .read(At.avroFile(avroFile.getAbsolutePath(), + Avros.generics(genericPersonSchema))); + + List recordList = Lists.newArrayList(genericCollection + .materialize()); + + assertEquals(Lists.newArrayList(savedRecord), + Lists.newArrayList(recordList)); + } + + @Test + public void testReflect() throws IOException { + Schema pojoPersonSchema = ReflectData.get().getSchema(PojoPerson.class); + GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema); + savedRecord.put("name", "John Doe"); + populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema); + + Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class); + PCollection personCollection = pipeline.read(At.avroFile( + avroFile.getAbsolutePath(), Avros.reflects(PojoPerson.class))); + + List recordList = Lists.newArrayList(personCollection + .materialize()); + + assertEquals(1, recordList.size()); + PojoPerson person = recordList.get(0); + assertEquals("John Doe", person.getName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java new file mode 100644 index 0000000..f5c9902 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.avro; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import org.junit.Test; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.FileHelper; +import org.apache.crunch.types.avro.Avros; +import com.google.common.collect.Lists; + +public class AvroReflectIT implements Serializable { + + static class StringWrapper { + private String value; + + public StringWrapper() { + this(null); + } + + public StringWrapper(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + @Override + public String toString() { + return String.format("", value); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((value == null) ? 0 : value.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + StringWrapper other = (StringWrapper) obj; + if (value == null) { + if (other.value != null) + return false; + } else if (!value.equals(other.value)) + return false; + return true; + } + + } + + @Test + public void testReflection() throws IOException { + Pipeline pipeline = new MRPipeline(AvroReflectIT.class); + PCollection stringWrapperCollection = pipeline + .readTextFile(FileHelper.createTempCopyOf("set1.txt")) + .parallelDo(new MapFn() { + + @Override + public StringWrapper map(String input) { + StringWrapper stringWrapper = new StringWrapper(); + stringWrapper.setValue(input); + return stringWrapper; + } + }, Avros.reflects(StringWrapper.class)); + + List stringWrappers = Lists + .newArrayList(stringWrapperCollection.materialize()); + + pipeline.done(); + + assertEquals(Lists.newArrayList(new StringWrapper("b"), + new StringWrapper("c"), new StringWrapper("a"), + new StringWrapper("e")), stringWrappers); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java new file mode 100644 index 0000000..33f1f0c --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.apache.crunch.types.writable.Writables.strings; +import static org.apache.crunch.types.writable.Writables.tableOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.test.Employee; +import org.apache.crunch.test.FileHelper; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.writable.WritableTypeFamily; +import org.apache.crunch.types.writable.Writables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +public class AggregateIT { + + @Test public void testWritables() throws Exception { + Pipeline pipeline = new MRPipeline(AggregateIT.class); + String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt"); + PCollection shakes = pipeline.readTextFile(shakesInputPath); + runMinMax(shakes, WritableTypeFamily.getInstance()); + pipeline.done(); + } + + @Test public void testAvro() throws Exception { + Pipeline pipeline = new MRPipeline(AggregateIT.class); + String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt"); + PCollection shakes = pipeline.readTextFile(shakesInputPath); + runMinMax(shakes, AvroTypeFamily.getInstance()); + pipeline.done(); + } + + @Test public void testInMemoryAvro() throws Exception { + PCollection someText = MemPipeline.collectionOf( + "first line", "second line", "third line"); + runMinMax(someText, AvroTypeFamily.getInstance()); + } + + public static void runMinMax(PCollection shakes, PTypeFamily family) throws Exception { + PCollection lengths = shakes.parallelDo(new MapFn() { + @Override + public Integer map(String input) { + return input.length(); + } + }, family.ints()); + PCollection negLengths = lengths.parallelDo(new MapFn() { + @Override + public Integer map(Integer input) { + return -input; + } + }, family.ints()); + Integer maxLengths = Iterables.getFirst(Aggregate.max(lengths).materialize(), null); + Integer minLengths = Iterables.getFirst(Aggregate.min(negLengths).materialize(), null); + assertTrue(maxLengths != null); + assertTrue(minLengths != null); + assertEquals(maxLengths.intValue(), -minLengths.intValue()); + } + + private static class SplitFn extends MapFn> { + @Override + public Pair map(String input) { + String[] p = input.split("\\s+"); + return Pair.of(p[0], p[1]); + } + } + + @Test public void testCollectUrls() throws Exception { + Pipeline p = new MRPipeline(AggregateIT.class); + String urlsInputPath = FileHelper.createTempCopyOf("urls.txt"); + PTable> urls = Aggregate.collectValues( + p.readTextFile(urlsInputPath) + .parallelDo(new SplitFn(), tableOf(strings(), strings()))); + for (Pair> e : urls.materialize()) { + String key = e.first(); + int expectedSize = 0; + if ("www.A.com".equals(key)) { + expectedSize = 4; + } else if ("www.B.com".equals(key) || "www.F.com".equals(key)) { + expectedSize = 2; + } else if ("www.C.com".equals(key) || "www.D.com".equals(key) || "www.E.com".equals(key)) { + expectedSize = 1; + } + assertEquals("Checking key = " + key, expectedSize, e.second().size()); + p.done(); + } + } + + @Test public void testTopN() throws Exception { + PTableType ptype = Avros.tableOf(Avros.strings(), Avros.ints()); + PTable counts = MemPipeline.typedTableOf(ptype, "foo", 12, "bar", 17, "baz", 29); + + PTable top2 = Aggregate.top(counts, 2, true); + assertEquals(ImmutableList.of(Pair.of("baz", 29), Pair.of("bar", 17)), top2.materialize()); + + PTable bottom2 = Aggregate.top(counts, 2, false); + assertEquals(ImmutableList.of(Pair.of("foo", 12), Pair.of("bar", 17)), bottom2.materialize()); + } + + @Test + public void testCollectValues_Writables() throws IOException { + Pipeline pipeline = new MRPipeline(AggregateIT.class); + Map> collectionMap = pipeline + .readTextFile(FileHelper.createTempCopyOf("set2.txt")) + .parallelDo(new MapStringToTextPair(), + Writables.tableOf(Writables.ints(), Writables.writables(Text.class)) + ).collectValues().materializeToMap(); + + assertEquals(1, collectionMap.size()); + + assertEquals(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a")), + collectionMap.get(1)); + } + + @Test + public void testCollectValues_Avro() throws IOException { + + MapStringToEmployeePair mapFn = new MapStringToEmployeePair(); + Pipeline pipeline = new MRPipeline(AggregateIT.class); + Map> collectionMap = pipeline + .readTextFile(FileHelper.createTempCopyOf("set2.txt")) + .parallelDo(mapFn, + Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues() + .materializeToMap(); + + assertEquals(1, collectionMap.size()); + + Employee empC = mapFn.map("c").second(); + Employee empD = mapFn.map("d").second(); + Employee empA = mapFn.map("a").second(); + + assertEquals(Lists.newArrayList(empC, empD, empA), + collectionMap.get(1)); + } + + private static class MapStringToTextPair extends MapFn> { + @Override + public Pair map(String input) { + return Pair.of(1, new Text(input)); + } + } + + private static class MapStringToEmployeePair extends MapFn> { + @Override + public Pair map(String input) { + Employee emp = new Employee(); + emp.setName(input); + emp.setSalary(0); + emp.setDepartment(""); + return Pair.of(1, emp); + } + } + + public static class PojoText { + private String value; + + public PojoText() { + this(""); + } + + public PojoText(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + @Override + public String toString() { + return String.format("PojoText<%s>", this.value); + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + PojoText other = (PojoText) obj; + if (value == null) { + if (other.value != null) + return false; + } else if (!value.equals(other.value)) + return false; + return true; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java b/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java new file mode 100644 index 0000000..0aece4b --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.apache.crunch.types.avro.Avros.ints; +import static org.apache.crunch.types.avro.Avros.records; +import static org.apache.crunch.types.avro.Avros.strings; +import static junit.framework.Assert.assertEquals; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.test.Person; +import com.google.common.collect.Lists; + +/** + * Test sorting Avro types by selected inner field + */ +public class AvroTypeSortIT implements Serializable { + + private static final long serialVersionUID = 1344118240353796561L; + + private transient File avroFile; + + @Before + public void setUp() throws IOException { + avroFile = File.createTempFile("avrotest", ".avro"); + } + + @After + public void tearDown() { + avroFile.delete(); + } + + @Test + public void testSortAvroTypesBySelectedFields() throws Exception { + + MRPipeline pipeline = new MRPipeline(AvroTypeSortIT.class); + + Person ccc10 = createPerson("CCC", 10); + Person bbb20 = createPerson("BBB", 20); + Person aaa30 = createPerson("AAA", 30); + + writeAvroFile(Lists.newArrayList(ccc10, bbb20, aaa30), avroFile); + + PCollection unsorted = pipeline.read(At.avroFile( + avroFile.getAbsolutePath(), records(Person.class))); + + // Sort by Name + MapFn nameExtractor = new MapFn() { + + @Override + public String map(Person input) { + return input.getName().toString(); + } + }; + + PCollection sortedByName = unsorted + .by(nameExtractor, strings()).groupByKey().ungroup().values(); + + List sortedByNameList = Lists.newArrayList(sortedByName + .materialize()); + + assertEquals(3, sortedByNameList.size()); + assertEquals(aaa30, sortedByNameList.get(0)); + assertEquals(bbb20, sortedByNameList.get(1)); + assertEquals(ccc10, sortedByNameList.get(2)); + + // Sort by Age + + MapFn ageExtractor = new MapFn() { + + @Override + public Integer map(Person input) { + return input.getAge(); + } + }; + + PCollection sortedByAge = unsorted.by(ageExtractor, ints()) + .groupByKey().ungroup().values(); + + List sortedByAgeList = Lists.newArrayList(sortedByAge + .materialize()); + + assertEquals(3, sortedByAgeList.size()); + assertEquals(ccc10, sortedByAgeList.get(0)); + assertEquals(bbb20, sortedByAgeList.get(1)); + assertEquals(aaa30, sortedByAgeList.get(2)); + + pipeline.done(); + } + + private void writeAvroFile(List people, File avroFile) + throws IOException { + + FileOutputStream outputStream = new FileOutputStream(avroFile); + SpecificDatumWriter writer = new SpecificDatumWriter( + Person.class); + + DataFileWriter dataFileWriter = new DataFileWriter( + writer); + dataFileWriter.create(Person.SCHEMA$, outputStream); + for (Person person : people) { + dataFileWriter.append(person); + } + dataFileWriter.close(); + outputStream.close(); + } + + private Person createPerson(String name, int age) throws IOException { + + Person person = new Person(); + person.setAge(age); + person.setName(name); + List siblingNames = Lists.newArrayList(); + person.setSiblingnames(siblingNames); + + return person; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java new file mode 100644 index 0000000..cc67257 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Collection; +import java.util.List; + +import org.junit.Test; + +import org.apache.crunch.CombineFn; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.fn.MapKeysFn; +import org.apache.crunch.fn.MapValuesFn; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.FileHelper; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableTypeFamily; +import com.google.common.base.Splitter; +import com.google.common.io.Files; + +public class CogroupIT { + + private static class WordSplit extends DoFn> { + @Override + public void process(String input, Emitter> emitter) { + for (String word : Splitter.on(' ').split(input)) { + emitter.emit(Pair.of(word, 1L)); + } + } + } + + public static PTable join(PCollection w1, + PCollection w2, PTypeFamily ptf) { + PTableType ntt = ptf.tableOf(ptf.strings(), ptf.longs()); + PTable ws1 = w1.parallelDo("ws1", new WordSplit(), ntt); + PTable ws2 = w2.parallelDo("ws2", new WordSplit(), ntt); + PTable, Collection>> cg = Cogroup.cogroup(ws1, ws2); + PTable sums = cg.parallelDo( + "wc", + new MapValuesFn, Collection>, Long>() { + @Override + public Long map(Pair, Collection> v) { + long sum = 0L; + for (Long value : v.first()) { + sum += value; + } + for (Long value : v.second()) { + sum += value; + } + return sum; + } + }, ntt); + return sums.parallelDo("firstletters", new MapKeysFn() { + @Override + public String map(String k1) { + if (k1.length() > 0) { + return k1.substring(0, 1).toLowerCase(); + } else { + return ""; + } + } + }, ntt).groupByKey().combineValues(CombineFn.SUM_LONGS()); + } + + @Test + public void testWritableJoin() throws Exception { + run(new MRPipeline(CogroupIT.class), WritableTypeFamily.getInstance()); + } + + @Test + public void testAvroJoin() throws Exception { + run(new MRPipeline(CogroupIT.class), AvroTypeFamily.getInstance()); + } + + public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { + String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt"); + String maughamInputPath = FileHelper.createTempCopyOf("maugham.txt"); + File output = FileHelper.createOutputPath(); + + PCollection shakespeare = pipeline.read(From.textFile(shakesInputPath)); + PCollection maugham = pipeline.read(From.textFile(maughamInputPath)); + pipeline.writeTextFile(join(shakespeare, maugham, typeFamily), output.getAbsolutePath()); + pipeline.done(); + + File outputFile = new File(output, "part-r-00000"); + List lines = Files.readLines(outputFile, Charset.defaultCharset()); + boolean passed = false; + for (String line : lines) { + if (line.equals("j\t705")) { + passed = true; + break; + } + } + assertTrue(passed); + + output.deleteOnExit(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/SetIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/SetIT.java b/crunch/src/it/java/org/apache/crunch/lib/SetIT.java new file mode 100644 index 0000000..ab09bef --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/SetIT.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Tuple3; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.test.FileHelper; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableTypeFamily; +import com.google.common.collect.Lists; + +@RunWith(value = Parameterized.class) +public class SetIT { + + private PTypeFamily typeFamily; + + private Pipeline pipeline; + private PCollection set1; + private PCollection set2; + + public SetIT(PTypeFamily typeFamily) { + this.typeFamily = typeFamily; + } + + @Parameters + public static Collection data() { + Object[][] data = new Object[][] { + { WritableTypeFamily.getInstance() }, + { AvroTypeFamily.getInstance() } + }; + return Arrays.asList(data); + } + + @Before + public void setUp() throws IOException { + String set1InputPath = FileHelper.createTempCopyOf("set1.txt"); + String set2InputPath = FileHelper.createTempCopyOf("set2.txt"); + pipeline = new MRPipeline(SetIT.class); + set1 = pipeline.read(At.textFile(set1InputPath, typeFamily.strings())); + set2 = pipeline.read(At.textFile(set2InputPath, typeFamily.strings())); + } + + @After + public void tearDown() { + pipeline.done(); + } + + @Test + public void testDifference() throws Exception { + PCollection difference = Set.difference(set1, set2); + assertEquals(Lists.newArrayList("b", "e"), + Lists.newArrayList(difference.materialize())); + } + + @Test + public void testIntersection() throws Exception { + PCollection intersection = Set.intersection(set1, set2); + assertEquals(Lists.newArrayList("a", "c"), + Lists.newArrayList(intersection.materialize())); + } + + @Test + public void testComm() throws Exception { + PCollection> comm = Set.comm(set1, set2); + Iterator> i = comm.materialize().iterator(); + checkEquals(null, null, "a", i.next()); + checkEquals("b", null, null, i.next()); + checkEquals(null, null, "c", i.next()); + checkEquals(null, "d", null, i.next()); + checkEquals("e", null, null, i.next()); + assertFalse(i.hasNext()); + } + + private void checkEquals(String s1, String s2, String s3, + Tuple3 tuple) { + assertEquals("first string", s1, tuple.first()); + assertEquals("second string", s2, tuple.second()); + assertEquals("third string", s3, tuple.third()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/SortIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java new file mode 100644 index 0000000..616ec15 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.apache.crunch.lib.Sort.ColumnOrder.by; +import static org.apache.crunch.lib.Sort.Order.ASCENDING; +import static org.apache.crunch.lib.Sort.Order.DESCENDING; +import static org.apache.crunch.test.StringWrapper.wrap; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Tuple3; +import org.apache.crunch.Tuple4; +import org.apache.crunch.TupleN; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.lib.Sort; +import org.apache.crunch.lib.Sort.ColumnOrder; +import org.apache.crunch.lib.Sort.Order; +import org.apache.crunch.test.FileHelper; +import org.apache.crunch.test.StringWrapper; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.writable.WritableTypeFamily; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class SortIT implements Serializable { + + @Test + public void testWritableSortAsc() throws Exception { + runSingle(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), Order.ASCENDING, + "A\tand this text as well"); + } + + @Test + public void testWritableSortDesc() throws Exception { + runSingle(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), Order.DESCENDING, + "B\tthis doc has some text"); + } + + @Test + public void testWritableSortAscDesc() throws Exception { + runPair(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), "A", "this doc has this text"); + } + + @Test + public void testWritableSortSecondDescFirstDesc() throws Exception { + runPair(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(2, DESCENDING), + by(1, ASCENDING), "A", "this doc has this text"); + } + + @Test + public void testWritableSortTripleAscDescAsc() throws Exception { + runTriple(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc"); + } + + @Test + public void testWritableSortQuadAscDescAscDesc() throws Exception { + runQuad(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has"); + } + + @Test + public void testWritableSortTupleNAscDesc() throws Exception { + runTupleN(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), new ColumnOrder[] { + by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" }); + } + + @Test + public void testWritableSortTable() throws Exception { + runTable(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), "A"); + } + + @Test + public void testAvroSortAsc() throws Exception { + runSingle(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), Order.ASCENDING, + "A\tand this text as well"); + } + + @Test + public void testAvroSortDesc() throws Exception { + runSingle(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), Order.DESCENDING, + "B\tthis doc has some text"); + } + + @Test + public void testAvroSortPairAscAsc() throws Exception { + runPair(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), "A", "this doc has this text"); + } + + @Test + @Ignore("Avro sorting only works in field order at the moment") + public void testAvroSortPairSecondAscFirstDesc() throws Exception { + runPair(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(2, DESCENDING), + by(1, ASCENDING), "A", "this doc has this text"); + } + + @Test + public void testAvroSortTripleAscDescAsc() throws Exception { + runTriple(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc"); + } + + @Test + public void testAvroSortQuadAscDescAscDesc() throws Exception { + runQuad(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), + by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has"); + } + + @Test + public void testAvroSortTupleNAscDesc() throws Exception { + runTupleN(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), + new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", + "this doc has this text" }); + } + + @Test + public void testAvroReflectSortPair() throws IOException { + Pipeline pipeline = new MRPipeline(SortIT.class); + PCollection> sorted = pipeline + .readTextFile(FileHelper.createTempCopyOf("set2.txt")) + .parallelDo(new MapFn>() { + + @Override + public Pair map(String input) { + return Pair.of(input, wrap(input)); + } + }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class))).sort(true); + + List> expected = Lists.newArrayList(); + expected.add(Pair.of("a", wrap("a"))); + expected.add(Pair.of("c", wrap("c"))); + expected.add(Pair.of("d", wrap("d"))); + + assertEquals(expected, Lists.newArrayList(sorted.materialize())); + } + + @Test + public void testAvroReflectSortTable() throws IOException { + Pipeline pipeline = new MRPipeline(SortIT.class); + PTable unsorted = pipeline.readTextFile( + FileHelper.createTempCopyOf("set2.txt")).parallelDo( + new MapFn>() { + + @Override + public Pair map(String input) { + return Pair.of(input, wrap(input)); + } + }, Avros.tableOf(Avros.strings(), Avros.reflects(StringWrapper.class))); + + PTable sorted = Sort.sort(unsorted); + + List> expected = Lists.newArrayList(); + expected.add(Pair.of("a", wrap("a"))); + expected.add(Pair.of("c", wrap("c"))); + expected.add(Pair.of("d", wrap("d"))); + + assertEquals(expected, Lists.newArrayList(sorted.materialize())); + } + + @Test + public void testAvroSortTable() throws Exception { + runTable(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), "A"); + } + + private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Order order, String firstLine) + throws IOException { + String inputPath = FileHelper.createTempCopyOf("docs.txt"); + + PCollection input = pipeline.readTextFile(inputPath); + // following turns the input from Writables to required type family + PCollection input2 = input.parallelDo(new DoFn() { + @Override + public void process(String input, Emitter emitter) { + emitter.emit(input); + } + }, typeFamily.strings()); + PCollection sorted = Sort.sort(input2, order); + Iterable lines = sorted.materialize(); + + assertEquals(firstLine, lines.iterator().next()); + pipeline.done(); // TODO: finally + } + + private void runPair(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, + ColumnOrder second, String firstField, String secondField) throws IOException { + String inputPath = FileHelper.createTempCopyOf("docs.txt"); + + PCollection input = pipeline.readTextFile(inputPath); + PCollection> kv = input.parallelDo( + new DoFn>() { + @Override + public void process(String input, Emitter> emitter) { + String[] split = input.split("[\t]+"); + emitter.emit(Pair.of(split[0], split[1])); + } + }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings())); + PCollection> sorted = Sort.sortPairs(kv, first, second); + Iterable> lines = sorted.materialize(); + Pair l = lines.iterator().next(); + assertEquals(firstField, l.first()); + assertEquals(secondField, l.second()); + pipeline.done(); + } + + private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, + ColumnOrder second, ColumnOrder third, String firstField, String secondField, + String thirdField) throws IOException { + String inputPath = FileHelper.createTempCopyOf("docs.txt"); + + PCollection input = pipeline.readTextFile(inputPath); + PCollection> kv = input.parallelDo( + new DoFn>() { + @Override + public void process(String input, Emitter> emitter) { + String[] split = input.split("[\t ]+"); + int len = split.length; + emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len])); + } + }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings())); + PCollection> sorted = Sort.sortTriples(kv, first, second, third); + Iterable> lines = sorted.materialize(); + Tuple3 l = lines.iterator().next(); + assertEquals(firstField, l.first()); + assertEquals(secondField, l.second()); + assertEquals(thirdField, l.third()); + pipeline.done(); + } + + private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, + ColumnOrder second, ColumnOrder third, ColumnOrder fourth, String firstField, + String secondField, String thirdField, String fourthField) throws IOException { + String inputPath = FileHelper.createTempCopyOf("docs.txt"); + + PCollection input = pipeline.readTextFile(inputPath); + PCollection> kv = input.parallelDo( + new DoFn>() { + @Override + public void process(String input, Emitter> emitter) { + String[] split = input.split("[\t ]+"); + int len = split.length; + emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len])); + } + }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), + typeFamily.strings())); + PCollection> sorted = Sort.sortQuads(kv, first, second, + third, fourth); + Iterable> lines = sorted.materialize(); + Tuple4 l = lines.iterator().next(); + assertEquals(firstField, l.first()); + assertEquals(secondField, l.second()); + assertEquals(thirdField, l.third()); + assertEquals(fourthField, l.fourth()); + pipeline.done(); + } + + private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder[] orders, + String[] fields) throws IOException { + String inputPath = FileHelper.createTempCopyOf("docs.txt"); + + PCollection input = pipeline.readTextFile(inputPath); + PType[] types = new PType[orders.length]; + Arrays.fill(types, typeFamily.strings()); + PCollection kv = input.parallelDo(new DoFn() { + @Override + public void process(String input, Emitter emitter) { + String[] split = input.split("[\t]+"); + emitter.emit(new TupleN(split)); + } + }, typeFamily.tuples(types)); + PCollection sorted = Sort.sortTuples(kv, orders); + Iterable lines = sorted.materialize(); + TupleN l = lines.iterator().next(); + int i = 0; + for (String field : fields) { + assertEquals(field, l.get(i++)); + } + pipeline.done(); + } + + private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey) + throws IOException { + String inputPath = FileHelper.createTempCopyOf("docs.txt"); + + PCollection input = pipeline.readTextFile(inputPath); + PTable table = input.parallelDo(new DoFn>() { + @Override + public void process(String input, Emitter> emitter) { + String[] split = input.split("[\t]+"); + emitter.emit(Pair.of(split[0], split[1])); + } + }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings())); + + PTable sorted = Sort.sort(table); + Iterable> lines = sorted.materialize(); + Pair l = lines.iterator().next(); + assertEquals(firstKey, l.first()); + pipeline.done(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java new file mode 100644 index 0000000..69f0717 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static junit.framework.Assert.assertEquals; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.mapred.AvroJob; +import org.apache.avro.specific.SpecificDatumWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.Person.Builder; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.avro.SafeAvroSerialization; +import com.google.common.collect.Lists; + +/** + * Test {@link SafeAvroSerialization} with Specific Avro types + */ +public class SpecificAvroGroupByIT implements Serializable { + + private static final long serialVersionUID = 1344118240353796561L; + + private transient File avroFile; + + @Before + public void setUp() throws IOException { + avroFile = File.createTempFile("avrotest", ".avro"); + } + + @After + public void tearDown() { + avroFile.delete(); + } + + @Test + public void testGrouByWithSpecificAvroType() throws Exception { + + MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class); + + testSpecificAvro(pipeline); + } + + @Test + public void testGrouByOnSpecificAvroButReflectionDatumReader() + throws Exception { + MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class); + + // https://issues.apache.org/jira/browse/AVRO-1046 resolves + // the ClassCastException when reading specific Avro types with + // ReflectDatumReader + + pipeline.getConfiguration().setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, + true); + + testSpecificAvro(pipeline); + } + + public void testSpecificAvro(MRPipeline pipeline) throws Exception { + + createPersonAvroFile(avroFile); + + PCollection unsorted = pipeline.read(At.avroFile( + avroFile.getAbsolutePath(), Avros.records(Person.class))); + + PTable sorted = unsorted + .parallelDo(new MapFn>() { + + @Override + public Pair map(Person input) { + String key = input.getName().toString(); + return Pair.of(key, input); + + } + }, Avros.tableOf(Avros.strings(), Avros.records(Person.class))) + .groupByKey().ungroup(); + + List> outputPersonList = Lists.newArrayList(sorted + .materialize()); + + assertEquals(1, outputPersonList.size()); + assertEquals(String.class, outputPersonList.get(0).first().getClass()); + assertEquals(Person.class, outputPersonList.get(0).second().getClass()); + + pipeline.done(); + } + + private void createPersonAvroFile(File avroFile) throws IOException { + + Builder person = Person.newBuilder(); + person.setAge(40); + person.setName("Bob"); + List siblingNames = Lists.newArrayList(); + siblingNames.add("Bob" + "1"); + siblingNames.add("Bob" + "2"); + person.setSiblingnames(siblingNames); + + FileOutputStream outputStream = new FileOutputStream(avroFile); + SpecificDatumWriter writer = new SpecificDatumWriter( + Person.class); + + DataFileWriter dataFileWriter = new DataFileWriter( + writer); + dataFileWriter.create(Person.SCHEMA$, outputStream); + dataFileWriter.append(person.build()); + dataFileWriter.close(); + outputStream.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java new file mode 100644 index 0000000..8016a24 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertTrue; + +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTypeFamily; + +public class FullOuterJoinIT extends JoinTester { + @Override + public void assertPassed(Iterable> lines) { + boolean passed1 = false; + boolean passed2 = false; + boolean passed3 = false; + for (Pair line : lines) { + if ("wretched".equals(line.first()) && 24 == line.second()) { + passed1 = true; + } + if ("againe".equals(line.first()) && 10 == line.second()) { + passed2 = true; + } + if ("Montparnasse.".equals(line.first()) && 2 == line.second()) { + passed3 = true; + } + } + assertTrue(passed1); + assertTrue(passed2); + assertTrue(passed3); + } + + @Override + protected JoinFn getJoinFn(PTypeFamily typeFamily) { + return new FullOuterJoinFn(typeFamily.longs()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java new file mode 100644 index 0000000..97220ac --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertTrue; + +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTypeFamily; + +public class InnerJoinIT extends JoinTester { + @Override + public void assertPassed(Iterable> lines) { + boolean passed1 = false; + boolean passed2 = true; + boolean passed3 = true; + for (Pair line : lines) { + if ("wretched".equals(line.first()) && 24 == line.second()) { + passed1 = true; + } + if ("againe".equals(line.first())) { + passed2 = false; + } + if ("Montparnasse.".equals(line.first())) { + passed3 = false; + } + } + assertTrue(passed1); + assertTrue(passed2); + assertTrue(passed3); + } + + @Override + protected JoinFn getJoinFn(PTypeFamily typeFamily) { + return new InnerJoinFn(typeFamily.longs()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java new file mode 100644 index 0000000..c5a9f39 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import java.io.IOException; +import java.io.Serializable; + +import org.junit.Test; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.lib.Join; +import org.apache.crunch.test.FileHelper; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableTypeFamily; + +public abstract class JoinTester implements Serializable { + private static class WordSplit extends DoFn { + @Override + public void process(String input, Emitter emitter) { + for (String word : input.split("\\s+")) { + emitter.emit(word); + } + } + } + + protected PTable join(PCollection w1, PCollection w2, + PTypeFamily ptf) { + PTableType ntt = ptf.tableOf(ptf.strings(), ptf.longs()); + PTable ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(), ptf.strings())); + PTable ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(), ptf.strings())); + + PTable> join = Join.join(ws1, ws2, getJoinFn(ptf)); + + PTable sums = join.parallelDo("cnt", + new DoFn>, Pair>() { + @Override + public void process(Pair> input, + Emitter> emitter) { + Pair pair = input.second(); + long sum = (pair.first() != null ? pair.first() : 0) + (pair.second() != null ? pair.second() : 0); + emitter.emit(Pair.of(input.first(), sum)); + } + }, ntt); + + return sums; + } + + protected void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { + String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt"); + String maughamInputPath = FileHelper.createTempCopyOf("maugham.txt"); + + PCollection shakespeare = pipeline.readTextFile(shakesInputPath); + PCollection maugham = pipeline.readTextFile(maughamInputPath); + PTable joined = join(shakespeare, maugham, typeFamily); + Iterable> lines = joined.materialize(); + + assertPassed(lines); + + pipeline.done(); + } + + @Test + public void testWritableJoin() throws Exception { + run(new MRPipeline(InnerJoinIT.class), WritableTypeFamily.getInstance()); + } + + @Test + public void testAvroJoin() throws Exception { + run(new MRPipeline(InnerJoinIT.class), AvroTypeFamily.getInstance()); + } + + /** + * Used to check that the result of the join makes sense. + * + * @param lines The result of the join. + */ + public abstract void assertPassed(Iterable> lines); + + /** + * @return The JoinFn to use. + */ + protected abstract JoinFn getJoinFn(PTypeFamily typeFamily); +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java new file mode 100644 index 0000000..aafe1c9 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertTrue; + +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTypeFamily; + +public class LeftOuterJoinIT extends JoinTester { + @Override + public void assertPassed(Iterable> lines) { + boolean passed1 = false; + boolean passed2 = false; + boolean passed3 = true; + for (Pair line : lines) { + if ("wretched".equals(line.first()) && 24 == line.second()) { + passed1 = true; + } + if ("againe".equals(line.first()) && 10 == line.second()) { + passed2 = true; + } + if ("Montparnasse.".equals(line.first())) { + passed3 = false; + } + } + assertTrue(passed1); + assertTrue(passed2); + assertTrue(passed3); + } + + @Override + protected JoinFn getJoinFn(PTypeFamily typeFamily) { + return new LeftOuterJoinFn(typeFamily.longs()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java new file mode 100644 index 0000000..baf3d4f --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.junit.Test; + +import org.apache.crunch.FilterFn; +import org.apache.crunch.MapFn; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.crunch.test.FileHelper; +import org.apache.crunch.types.writable.Writables; +import com.google.common.collect.Lists; + +public class MapsideJoinIT { + + private static class LineSplitter extends MapFn> { + + @Override + public Pair map(String input) { + String[] fields = input.split("\\|"); + return Pair.of(Integer.parseInt(fields[0]), fields[1]); + } + + } + + private static class NegativeFilter extends FilterFn> { + + @Override + public boolean accept(Pair input) { + return false; + } + + } + + @Test(expected = CrunchRuntimeException.class) + public void testNonMapReducePipeline() { + runMapsideJoin(MemPipeline.getInstance()); + } + + @Test + public void testMapsideJoin_RightSideIsEmpty() throws IOException { + MRPipeline pipeline = new MRPipeline(MapsideJoinIT.class); + PTable customerTable = readTable(pipeline, "customers.txt"); + PTable orderTable = readTable(pipeline, "orders.txt"); + + PTable filteredOrderTable = orderTable.parallelDo(new NegativeFilter(), + orderTable.getPTableType()); + + PTable> joined = MapsideJoin.join(customerTable, + filteredOrderTable); + + List>> materializedJoin = Lists.newArrayList(joined + .materialize()); + + assertTrue(materializedJoin.isEmpty()); + + } + + @Test + public void testMapsideJoin() throws IOException { + runMapsideJoin(new MRPipeline(MapsideJoinIT.class)); + } + + private void runMapsideJoin(Pipeline pipeline) { + PTable customerTable = readTable(pipeline, "customers.txt"); + PTable orderTable = readTable(pipeline, "orders.txt"); + + PTable> joined = MapsideJoin.join(customerTable, orderTable); + + List>> expectedJoinResult = Lists.newArrayList(); + expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn flakes"))); + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet paper"))); + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet plunger"))); + expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", "Toilet brush"))); + + List>> joinedResultList = Lists.newArrayList(joined + .materialize()); + Collections.sort(joinedResultList); + + assertEquals(expectedJoinResult, joinedResultList); + } + + private static PTable readTable(Pipeline pipeline, String filename) { + try { + return pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable", + new LineSplitter(), Writables.tableOf(Writables.ints(), Writables.strings())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java new file mode 100644 index 0000000..5fcef72 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.apache.crunch.types.avro.Avros.records; +import static org.apache.crunch.types.avro.Avros.strings; +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.Employee; +import org.apache.crunch.test.Person; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class MultiAvroSchemaJoinIT { + + private File personFile; + private File employeeFile; + + @Before + public void setUp() throws Exception { + this.personFile = File.createTempFile("person", ".avro"); + this.employeeFile = File.createTempFile("employee", ".avro"); + + DatumWriter pdw = new SpecificDatumWriter(); + DataFileWriter pfw = new DataFileWriter(pdw); + pfw.create(Person.SCHEMA$, personFile); + Person p1 = new Person(); + p1.setName("Josh"); + p1.setAge(19); + p1.setSiblingnames(ImmutableList.of("Kate", "Mike")); + pfw.append(p1); + Person p2 = new Person(); + p2.setName("Kate"); + p2.setAge(17); + p2.setSiblingnames(ImmutableList.of("Josh", "Mike")); + pfw.append(p2); + Person p3 = new Person(); + p3.setName("Mike"); + p3.setAge(12); + p3.setSiblingnames(ImmutableList.of("Josh", "Kate")); + pfw.append(p3); + pfw.close(); + + DatumWriter edw = new SpecificDatumWriter(); + DataFileWriter efw = new DataFileWriter(edw); + efw.create(Employee.SCHEMA$, employeeFile); + Employee e1 = new Employee(); + e1.setName("Kate"); + e1.setSalary(100000); + e1.setDepartment("Marketing"); + efw.append(e1); + efw.close(); + } + + @After + public void tearDown() throws Exception { + personFile.delete(); + employeeFile.delete(); + } + + public static class NameFn extends MapFn { + @Override + public String map(K input) { + Schema s = input.getSchema(); + Schema.Field f = s.getField("name"); + return input.get(f.pos()).toString(); + } + } + + @Test + public void testJoin() throws Exception { + Pipeline p = new MRPipeline(MultiAvroSchemaJoinIT.class); + PCollection people = p.read(From.avroFile(personFile.getAbsolutePath(), records(Person.class))); + PCollection employees = p.read(From.avroFile(employeeFile.getAbsolutePath(), records(Employee.class))); + + Iterable> result = people.by(new NameFn(), strings()) + .join(employees.by(new NameFn(), strings())).values().materialize(); + List> v = Lists.newArrayList(result); + assertEquals(1, v.size()); + assertEquals("Kate", v.get(0).first().getName().toString()); + assertEquals("Kate", v.get(0).second().getName().toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java new file mode 100644 index 0000000..a3bb122 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertTrue; + +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTypeFamily; + +public class RightOuterJoinIT extends JoinTester { + @Override + public void assertPassed(Iterable> lines) { + boolean passed1 = false; + boolean passed2 = true; + boolean passed3 = false; + for (Pair line : lines) { + if ("wretched".equals(line.first()) && 24 == line.second()) { + passed1 = true; + } + if ("againe".equals(line.first())) { + passed2 = false; + } + if ("Montparnasse.".equals(line.first()) && 2 == line.second()) { + passed3 = true; + } + } + assertTrue(passed1); + assertTrue(passed2); + assertTrue(passed3); + } + + @Override + protected JoinFn getJoinFn(PTypeFamily typeFamily) { + return new RightOuterJoinFn(typeFamily.longs()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/customers.txt ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/customers.txt b/crunch/src/it/resources/customers.txt new file mode 100644 index 0000000..98f3f3d --- /dev/null +++ b/crunch/src/it/resources/customers.txt @@ -0,0 +1,4 @@ +111|John Doe +222|Jane Doe +333|Someone Else +444|Has No Orders \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/docs.txt ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/docs.txt b/crunch/src/it/resources/docs.txt new file mode 100644 index 0000000..90a3f65 --- /dev/null +++ b/crunch/src/it/resources/docs.txt @@ -0,0 +1,6 @@ +A this doc has this text +A and this text as well +A but also this +B this doc has some text +B but not as much as the last +B doc http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/employee.avro ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/employee.avro b/crunch/src/it/resources/employee.avro new file mode 100644 index 0000000..3563df9 --- /dev/null +++ b/crunch/src/it/resources/employee.avro @@ -0,0 +1,9 @@ +{ +"namespace": "com.cloudera.crunch.test", +"name": "Employee", +"type": "record", +"fields": [ + {"name": "name", "type": ["string", "null"] }, + {"name": "salary", "type": "int"}, + {"name": "department", "type": ["string", "null"] } ] +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/emptyTextFile.txt ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/emptyTextFile.txt b/crunch/src/it/resources/emptyTextFile.txt new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/letters.txt ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/letters.txt b/crunch/src/it/resources/letters.txt new file mode 100644 index 0000000..916bfc9 --- /dev/null +++ b/crunch/src/it/resources/letters.txt @@ -0,0 +1,2 @@ +a +bb \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/log4j.properties b/crunch/src/it/resources/log4j.properties new file mode 100644 index 0000000..c8173d7 --- /dev/null +++ b/crunch/src/it/resources/log4j.properties @@ -0,0 +1,11 @@ +# ***** Set root logger level to INFO and its only appender to A. +log4j.logger.org.apache.crunch=info, A + +# Log warnings on Hadoop for the local runner when testing +log4j.logger.org.apache.hadoop=warn, A + +# ***** A is set to be a ConsoleAppender. +log4j.appender.A=org.apache.log4j.ConsoleAppender +# ***** A uses PatternLayout. +log4j.appender.A.layout=org.apache.log4j.PatternLayout +log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n