hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millec...@apache.org
Subject svn commit: r1546951 [2/2] - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/join/ core/src/test/java/org/apache/hama/bsp/
Date Mon, 02 Dec 2013 10:41:33 GMT
Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCompositeInputFormat.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCompositeInputFormat.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCompositeInputFormat.java Mon Dec
 2 10:41:32 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.join.CompositeInputFormat;
+import org.apache.hama.bsp.join.TupleWritable;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.util.KeyValuePair;
+
+public class TestCompositeInputFormat extends HamaCluster {
+  protected HamaConfiguration configuration;
+
+  public TestCompositeInputFormat() {
+    configuration = new HamaConfiguration();
+    configuration.set("bsp.master.address", "localhost");
+    configuration.set("hama.child.redirect.log.console", "true");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.set("bsp.local.dir", "/tmp/hama-test");
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+    configuration.set("hama.sync.client.class",
+        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+            .getCanonicalName());
+  }
+
+  public static final Log LOG = LogFactory
+      .getLog(TestCompositeInputFormat.class);
+
+  public void testCompositeInputFormat() throws IOException,
+      ClassNotFoundException, InterruptedException {
+    generateTestData();
+
+    HamaConfiguration conf = new HamaConfiguration();
+    BSPJob job = new BSPJob(conf);
+
+    FileInputFormat.setInputPaths(job, "/tmp/a.dat,/tmp/b.dat");
+
+    job.setInputFormat(CompositeInputFormat.class);
+    job.set("bsp.join.expr", CompositeInputFormat.compose("outer",
+        SequenceFileInputFormat.class, FileInputFormat.getInputPaths(job)));
+    job.setOutputFormat(NullOutputFormat.class);
+
+    job.setBspClass(JoinBSP.class);
+    job.waitForCompletion(true);
+
+    FileSystem fs = FileSystem.get(conf);
+
+    fs.delete(new Path("/tmp/a.dat"), true);
+    fs.delete(new Path("/tmp/b.dat"), true);
+  }
+
+  public static class JoinBSP extends
+      BSP<IntWritable, TupleWritable, NullWritable, NullWritable, NullWritable> {
+
+    @Override
+    public void bsp(
+        BSPPeer<IntWritable, TupleWritable, NullWritable, NullWritable, NullWritable>
peer)
+        throws IOException, SyncException, InterruptedException {
+      KeyValuePair<IntWritable, TupleWritable> readNext = null;
+      while ((readNext = peer.readNext()) != null) {
+        int a = ((IntWritable) readNext.getValue().get(0)).get();
+        int b = ((IntWritable) readNext.getValue().get(1)).get();
+
+        assertEquals(a + b, 10);
+      }
+    }
+  }
+
+  private void generateTestData() {
+    try {
+      SequenceFile.Writer writer = SequenceFile.createWriter(fs, configuration,
+          new Path("/tmp/a.dat"), IntWritable.class, IntWritable.class);
+      for (int i = 0; i < 10; i++) {
+        writer.append(new IntWritable(i), new IntWritable(i));
+      }
+      writer.close();
+
+      writer = SequenceFile.createWriter(fs, configuration, new Path(
+          "/tmp/b.dat"), IntWritable.class, IntWritable.class);
+      for (int i = 0; i < 10; i++) {
+        writer.append(new IntWritable(i), new IntWritable(10 - i));
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}



Mime
View raw message