crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [40/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
Date Tue, 23 Apr 2013 20:41:42 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
new file mode 100644
index 0000000..5292353
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test {@link org.apache.crunch.types.avro.SafeAvroSerialization} with Specific Avro types
+ */
+public class SpecificAvroGroupByIT implements Serializable {
+
+  private static final long serialVersionUID = 1344118240353796561L;
+
+  private transient File avroFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = File.createTempFile("avrotest", ".avro");
+  }
+
+  @After
+  public void tearDown() {
+    avroFile.delete();
+  }
+
+  @Test
+  public void testGrouByWithSpecificAvroType() throws Exception {
+    MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, tmpDir.getDefaultConfiguration());
+    testSpecificAvro(pipeline);
+  }
+
+  public void testSpecificAvro(MRPipeline pipeline) throws Exception {
+
+    createPersonAvroFile(avroFile);
+
+    PCollection<Person> unsorted = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
Avros.records(Person.class)));
+
+    PTable<String, Person> sorted = unsorted.parallelDo(new MapFn<Person, Pair<String,
Person>>() {
+
+      @Override
+      public Pair<String, Person> map(Person input) {
+        String key = input.name.toString();
+        return Pair.of(key, input);
+
+      }
+    }, Avros.tableOf(Avros.strings(), Avros.records(Person.class))).groupByKey().ungroup();
+
+    List<Pair<String, Person>> outputPersonList = Lists.newArrayList(sorted.materialize());
+
+    assertEquals(1, outputPersonList.size());
+    assertEquals(String.class, outputPersonList.get(0).first().getClass());
+    assertEquals(Person.class, outputPersonList.get(0).second().getClass());
+
+    pipeline.done();
+  }
+
+  private void createPersonAvroFile(File avroFile) throws IOException {
+
+    Person person = new Person();
+    person.age = 40;
+    person.name = "Bob";
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    siblingNames.add("Bob" + "1");
+    siblingNames.add("Bob" + "2");
+    person.siblingnames = siblingNames;
+
+    FileOutputStream outputStream = new FileOutputStream(avroFile);
+    SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(Person.class);
+
+    DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(writer);
+    dataFileWriter.create(Person.SCHEMA$, outputStream);
+    dataFileWriter.append(person);
+    dataFileWriter.close();
+    outputStream.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
new file mode 100644
index 0000000..63d594d
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class FullOuterJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = false;
+    boolean passed3 = false;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first()) && 10 == line.second()) {
+        passed2 = true;
+      }
+      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
+        passed3 = true;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new FullOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
new file mode 100644
index 0000000..4759050
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class InnerJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = true;
+    boolean passed3 = true;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first())) {
+        passed2 = false;
+      }
+      if ("Montparnasse.".equals(line.first())) {
+        passed3 = false;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new InnerJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java
new file mode 100644
index 0000000..3e8ffda
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java
@@ -0,0 +1,108 @@
+/**
+R * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+public abstract class JoinTester implements Serializable {
+  private static class WordSplit extends DoFn<String, String> {
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      for (String word : input.split("\\s+")) {
+        emitter.emit(word);
+      }
+    }
+  }
+
+  protected PTable<String, Long> join(PCollection<String> w1, PCollection<String>
w2, PTypeFamily ptf) {
+    PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
+    PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(),
ptf.strings()));
+    PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(),
ptf.strings()));
+
+    PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf));
+
+    PTable<String, Long> sums = join.parallelDo("cnt", new DoFn<Pair<String,
Pair<Long, Long>>, Pair<String, Long>>() {
+      @Override
+      public void process(Pair<String, Pair<Long, Long>> input, Emitter<Pair<String,
Long>> emitter) {
+        Pair<Long, Long> pair = input.second();
+        long sum = (pair.first() != null ? pair.first() : 0) + (pair.second() != null ? pair.second()
: 0);
+        emitter.emit(Pair.of(input.first(), sum));
+      }
+    }, ntt);
+
+    return sums;
+  }
+
+  protected void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+    String maughamInputPath = tmpDir.copyResourceFileName("maugham.txt");
+
+    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
+    PCollection<String> maugham = pipeline.readTextFile(maughamInputPath);
+    PTable<String, Long> joined = join(shakespeare, maugham, typeFamily);
+    Iterable<Pair<String, Long>> lines = joined.materialize();
+
+    assertPassed(lines);
+
+    pipeline.done();
+  }
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testWritableJoin() throws Exception {
+    run(new MRPipeline(InnerJoinIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvroJoin() throws Exception {
+    run(new MRPipeline(InnerJoinIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+  }
+
+  /**
+   * Used to check that the result of the join makes sense.
+   * 
+   * @param lines
+   *          The result of the join.
+   */
+  public abstract void assertPassed(Iterable<Pair<String, Long>> lines);
+
+  /**
+   * @return The JoinFn to use.
+   */
+  protected abstract JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
new file mode 100644
index 0000000..4ad2a81
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class LeftOuterJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = false;
+    boolean passed3 = true;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first()) && 10 == line.second()) {
+        passed2 = true;
+      }
+      if ("Montparnasse.".equals(line.first())) {
+        passed3 = false;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new LeftOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
new file mode 100644
index 0000000..8bb5586
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.fn.MapValuesFn;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class MapsideJoinIT {
+  
+  private static String saveTempDir;
+  
+  @BeforeClass
+  public static void setUpClass(){
+    
+    // Ensure a consistent temporary directory for use of the DistributedCache.
+    
+    // The DistributedCache technically isn't supported when running in local mode, and the
default
+    // temporary directiory "/tmp" is used as its location. This typically only causes an
issue when 
+    // running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default
temporary
+    // directory. The following call ensures that "/tmp" is used as the temporary directory
on all platforms.
+    saveTempDir = System.setProperty("java.io.tmpdir", "/tmp");
+  }
+  
+  @AfterClass
+  public static void tearDownClass(){
+    System.setProperty("java.io.tmpdir", saveTempDir);
+  }
+
+  private static class LineSplitter extends MapFn<String, Pair<Integer, String>>
{
+    @Override
+    public Pair<Integer, String> map(String input) {
+      String[] fields = input.split("\\|");
+      return Pair.of(Integer.parseInt(fields[0]), fields[1]);
+    }
+  }
+
+  private static class CapOrdersFn extends MapValuesFn<Integer, String, String> {
+    @Override
+    public String map(String v) {
+      return v.toUpperCase();
+    }
+  }
+  
+  private static class ConcatValuesFn extends MapValuesFn<Integer, Pair<String, String>,
String> {
+    @Override
+    public String map(Pair<String, String> v) {
+      return v.toString();
+    }
+  }
+  
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testMapSideJoin_MemPipeline() {
+    runMapsideJoin(MemPipeline.getInstance(), true);
+  }
+
+  @Test
+  public void testMapsideJoin_RightSideIsEmpty() throws IOException {
+    MRPipeline pipeline = new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration());
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, String> filteredOrderTable = orderTable
+        .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType());
+
+    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable,
filteredOrderTable);
+
+    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
+
+    assertTrue(materializedJoin.isEmpty());
+  }
+
+  @Test
+  public void testMapsideJoin() throws IOException {
+    runMapsideJoin(new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration()),
false);
+  }
+
+  private void runMapsideJoin(Pipeline pipeline, boolean inMemory) {
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+    
+    PTable<Integer, String> custOrders = MapsideJoin.join(customerTable, orderTable)
+        .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings()));
+
+    PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(),
orderTable.getPTableType());
+    
+    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(custOrders,
ORDER_TABLE);
+
+    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+    expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
+    Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+    
+    PipelineResult res = pipeline.run();
+    if (!inMemory) {
+      assertEquals(2, res.getStageResults().size());
+    }
+     
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+    Collections.sort(joinedResultList);
+
+    assertEquals(expectedJoinResult, joinedResultList);
+  }
+
+  private PTable<Integer, String> readTable(Pipeline pipeline, String filename) {
+    try {
+      return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable",
new LineSplitter(),
+          Writables.tableOf(Writables.ints(), Writables.strings()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
b/crunch-core/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
new file mode 100644
index 0000000..f1ca770
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.types.avro.Avros.records;
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class MultiAvroSchemaJoinIT {
+
+  private File personFile;
+  private File employeeFile;
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws Exception {
+    this.personFile = File.createTempFile("person", ".avro");
+    this.employeeFile = File.createTempFile("employee", ".avro");
+
+    DatumWriter<Person> pdw = new SpecificDatumWriter<Person>();
+    DataFileWriter<Person> pfw = new DataFileWriter<Person>(pdw);
+    pfw.create(Person.SCHEMA$, personFile);
+    Person p1 = new Person();
+    p1.name = "Josh";
+    p1.age = 19;
+    p1.siblingnames = ImmutableList.<CharSequence> of("Kate", "Mike");
+    pfw.append(p1);
+    Person p2 = new Person();
+    p2.name = "Kate";
+    p2.age = 17;;
+    p2.siblingnames = ImmutableList.<CharSequence> of("Josh", "Mike");
+    pfw.append(p2);
+    Person p3 = new Person();
+    p3.name = "Mike";
+    p3.age = 12;
+    p3.siblingnames = ImmutableList.<CharSequence> of("Josh", "Kate");
+    pfw.append(p3);
+    pfw.close();
+
+    DatumWriter<Employee> edw = new SpecificDatumWriter<Employee>();
+    DataFileWriter<Employee> efw = new DataFileWriter<Employee>(edw);
+    efw.create(Employee.SCHEMA$, employeeFile);
+    Employee e1 = new Employee();
+    e1.name = "Kate";
+    e1.salary = 100000;
+    e1.department = "Marketing";
+    efw.append(e1);
+    efw.close();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    personFile.delete();
+    employeeFile.delete();
+  }
+
+  public static class NameFn<K extends SpecificRecord> extends MapFn<K, String>
{
+    @Override
+    public String map(K input) {
+      Schema s = input.getSchema();
+      Schema.Field f = s.getField("name");
+      return input.get(f.pos()).toString();
+    }
+  }
+
+  @Test
+  public void testJoin() throws Exception {
+    Pipeline p = new MRPipeline(MultiAvroSchemaJoinIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> people = p.read(From.avroFile(personFile.getAbsolutePath(),
records(Person.class)));
+    PCollection<Employee> employees = p.read(From.avroFile(employeeFile.getAbsolutePath(),
records(Employee.class)));
+
+    Iterable<Pair<Person, Employee>> result = people.by(new NameFn<Person>(),
strings())
+        .join(employees.by(new NameFn<Employee>(), strings())).values().materialize();
+    List<Pair<Person, Employee>> v = Lists.newArrayList(result);
+    assertEquals(1, v.size());
+    assertEquals("Kate", v.get(0).first().name.toString());
+    assertEquals("Kate", v.get(0).second().name.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
new file mode 100644
index 0000000..d889b61
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class RightOuterJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = true;
+    boolean passed3 = false;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first())) {
+        passed2 = false;
+      }
+      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
+        passed3 = true;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new RightOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/test/TemporaryPaths.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/test/TemporaryPaths.java b/crunch-core/src/it/java/org/apache/crunch/test/TemporaryPaths.java
new file mode 100644
index 0000000..97cf0de
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/test/TemporaryPaths.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.hadoop.conf.Configuration;
+
+
+/**
+ * Utilities for working with {@link TemporaryPath}.
+ */
+public final class TemporaryPaths {
+
+  /**
+   * Static factory returning a {@link TemporaryPath} with adjusted
+   * {@link Configuration} properties.
+   */
+  public static TemporaryPath create() {
+    return new TemporaryPath(RuntimeParameters.TMP_DIR, "hadoop.tmp.dir");
+  }
+
+  private TemporaryPaths() {
+    // nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/test/Tests.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/test/Tests.java b/crunch-core/src/it/java/org/apache/crunch/test/Tests.java
new file mode 100644
index 0000000..e381c1a
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/test/Tests.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.hadoop.io.Writable;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Resources;
+
+
+/**
+ * Utilities for integration tests.
+ */
+public final class Tests {
+
+  private Tests() {
+    // nothing
+  }
+
+  /**
+   * Get the path to and integration test resource file, as per naming convention.
+   *
+   * @param testCase The executing test case instance
+   * @param resourceName The file name of the resource
+   * @return The path to the resource (never null)
+   * @throws IllegalArgumentException Thrown if the resource doesn't exist
+   */
+  public static String pathTo(Object testCase, String resourceName) {
+    String qualifiedName = resource(testCase, resourceName);
+    return Resources.getResource(qualifiedName).getFile();
+  }
+
+  /**
+   * This doesn't check whether the resource exists!
+   *
+   * @param testCase
+   * @param resourceName
+   * @return The path to the resource (never null)
+   */
+  public static String resource(Object testCase, String resourceName) {
+    checkNotNull(testCase);
+    checkNotNull(resourceName);
+
+    // Note: We append "Data" because otherwise Eclipse would complain about the
+    //       the case's class name clashing with the resource directory's name.
+    return testCase.getClass().getName().replaceAll("\\.", "/") + "Data/" + resourceName;
+  }
+
+  /**
+   * Return our two types of {@link Pipeline}s for a JUnit Parameterized test.
+   *
+   * @param testCase The executing test case's class
+   * @return The collection to return from a {@link Parameters} provider method
+   */
+  public static Collection<Object[]> pipelinesParams(Class<?> testCase) {
+    return ImmutableList.copyOf(
+        new Object[][] { { MemPipeline.getInstance() }, { new MRPipeline(testCase) }
+    });
+  }
+
+  /**
+   * Serialize the given Writable into a byte array.
+   *
+   * @param value The instance to serialize
+   * @return The serialized data
+   */
+  public static byte[] serialize(Writable value) {
+    checkNotNull(value);
+    try {
+      ByteArrayDataOutput out = ByteStreams.newDataOutput();
+      value.write(out);
+      return out.toByteArray();
+    } catch (IOException e) {
+      throw new IllegalStateException("cannot serialize", e);
+    }
+  }
+
+  /**
+   * Serialize the src Writable into a byte array, then deserialize it into dest.
+   * @param src The instance to serialize
+   * @param dest The instance to deserialize into
+   * @return dest, for convenience
+   */
+  public static <T extends Writable> T roundtrip(Writable src, T dest) {
+    checkNotNull(src);
+    checkNotNull(dest);
+    checkArgument(src != dest, "src and dest may not be the same instance");
+
+    try {
+      byte[] data = serialize(src);
+      dest.readFields(ByteStreams.newDataInput(data));
+    } catch (IOException e) {
+      throw new IllegalStateException("cannot deserialize", e);
+    }
+    return dest;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/customers.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/customers.txt b/crunch-core/src/it/resources/customers.txt
new file mode 100644
index 0000000..98f3f3d
--- /dev/null
+++ b/crunch-core/src/it/resources/customers.txt
@@ -0,0 +1,4 @@
+111|John Doe
+222|Jane Doe
+333|Someone Else
+444|Has No Orders
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/docs.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/docs.txt b/crunch-core/src/it/resources/docs.txt
new file mode 100644
index 0000000..90a3f65
--- /dev/null
+++ b/crunch-core/src/it/resources/docs.txt
@@ -0,0 +1,6 @@
+A	this doc has this text
+A	and this text as well
+A	but also this
+B	this doc has some text
+B	but not as much as the last
+B	doc

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/emptyTextFile.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/emptyTextFile.txt b/crunch-core/src/it/resources/emptyTextFile.txt
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/letters.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/letters.txt b/crunch-core/src/it/resources/letters.txt
new file mode 100644
index 0000000..916bfc9
--- /dev/null
+++ b/crunch-core/src/it/resources/letters.txt
@@ -0,0 +1,2 @@
+a
+bb
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/log4j.properties b/crunch-core/src/it/resources/log4j.properties
new file mode 100644
index 0000000..5d144a0
--- /dev/null
+++ b/crunch-core/src/it/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.crunch=info, A
+
+# Log warnings on Hadoop for the local runner when testing
+log4j.logger.org.apache.hadoop=warn, A
+# Except for Configuration, which is chatty.
+log4j.logger.org.apache.hadoop.conf.Configuration=error, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n


Mime
View raw message