avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1141975 - in /avro/trunk: ./ lang/java/mapred/ lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/ lang/java/mapred/src/test/java/org/apache/avro/mapred/ lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/
Date Fri, 01 Jul 2011 16:23:40 GMT
Author: cutting
Date: Fri Jul  1 16:23:39 2011
New Revision: 1141975

URL: http://svn.apache.org/viewvc?rev=1141975&view=rev
Log:
AVRO-847. Java: Add a unit test for Java MapReduce tether.  Contributed by Jeremy Lewi.

Added:
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/   (props changed)
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jul  1 16:23:39 2011
@@ -17,6 +17,8 @@ Avro 1.6.0 (unreleased)
     AVRO-841. Java: Implement insertion in GenericData.Array.
     (Nick Palmer via cutting)
 
+    AVRO-847. Java: Add a unit test for Java MapReduce tether. (Jeremy Lewi)
+
   BUG FIXES
 
     AVRO-845. setup.py uses Python2.7+ specific code

Propchange: avro/trunk/lang/java/mapred/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Jul  1 16:23:39 2011
@@ -1,4 +1,5 @@
 target
+userlogs
 .classpath
 .settings
 .project

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
Fri Jul  1 16:23:39 2011
@@ -35,6 +35,10 @@ import org.apache.hadoop.mapred.RunningJ
 @SuppressWarnings("deprecation")
 public class TetherJob extends Configured {
 
+  public static final String TETHER_EXEC="avro.tether.executable";
+  public static final String TETHER_EXEC_ARGS="avro.tether.executable_args";
+  public static final String TETHER_EXEC_CACHED="avro.tether.executable_cached";
+  
   /** Get the URI of the application's executable. */
   public static URI getExecutable(JobConf job) {
     try {
@@ -46,7 +50,23 @@ public class TetherJob extends Configure
   
   /** Set the URI for the application's executable. Normally this in HDFS. */
   public static void setExecutable(JobConf job, URI executable) {
-    job.set("avro.tether.executable", executable.toString());
+    setExecutable(job,executable,"",false);
+  }
+  
+  /**
+   * Set the URI for the application's executable (i.e the program to run in a subprocess

+   * and provides the mapper/reducer). 
+   * @param job - Job
+   * @param executable - The URI of the executable
+   * @param argstr - A string of additional arguments
+   * @param cached - If true, the executable URI is cached using DistributedCache
+   *               - if false its not cached. I.e if the file is already stored on each local
file system
+   *                or if its on a NFS share
+   */
+  public static void setExecutable(JobConf job, URI executable,String argstr,boolean cached)
{
+        job.set(TETHER_EXEC, executable.toString());
+        job.set(TETHER_EXEC_ARGS, argstr);
+        job.set(TETHER_EXEC_CACHED,  (new Boolean(cached)).toString());
   }
 
   /** Submit a job to the map/reduce cluster. All of the necessary
@@ -76,6 +96,9 @@ public class TetherJob extends Configure
     job.setOutputKeyComparatorClass(TetherKeyComparator.class);
     job.setMapOutputValueClass(NullWritable.class);
 
+    // set the map output key class to TetherData
+    job.setMapOutputKeyClass(TetherData.class);
+    
     // add TetherKeySerialization to io.serializations
     Collection<String> serializations =
       job.getStringCollection("io.serializations");
@@ -84,8 +107,11 @@ public class TetherJob extends Configure
       job.setStrings("io.serializations",
                      serializations.toArray(new String[0]));
     }
-    
-    DistributedCache.addCacheFile(getExecutable(job), job);
+
+    // determine whether the executable should be added to the cache.
+    if (job.getBoolean(TETHER_EXEC_CACHED,false)){
+      DistributedCache.addCacheFile(getExecutable(job), job);
+    }
   }
 
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
Fri Jul  1 16:23:39 2011
@@ -75,7 +75,7 @@ class TetherKeySerialization
     
     public void open(OutputStream out) {
       this.out = out;
-      this.encoder = EncoderFactory.get().binaryEncoder(out, null);
+      this.encoder = EncoderFactory.get().directBinaryEncoder(out, encoder);
     }
 
     public void serialize(TetherData datum) throws IOException {

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
Fri Jul  1 16:23:39 2011
@@ -38,14 +38,19 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileUtil;
 
 import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.SocketTransceiver;
 import org.apache.avro.ipc.Server;
-import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.ipc.SaslSocketServer;
+import org.apache.avro.ipc.SaslSocketTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.avro.ipc.specific.SpecificResponder;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 class TetheredProcess  {
 
+  static final Logger LOG = LoggerFactory.getLogger(TetherMapRunner.class);
+
   private JobConf job;
 
   TetherOutputService outputService;
@@ -60,17 +65,28 @@ class TetheredProcess  {
     try {
       // start server
       this.outputService = new TetherOutputService(collector, reporter);
-      this.outputServer = new SocketServer
+      this.outputServer = new SaslSocketServer
         (new SpecificResponder(OutputProtocol.class, outputService),
          new InetSocketAddress(0));
       outputServer.start();
       
       // start sub-process, connecting back to server
       this.subprocess = startSubprocess(job);
-      
-      // open client, connecting to sub-process
-      this.clientTransceiver =
-        new SocketTransceiver(new InetSocketAddress(outputService.inputPort()));
+
+      // check if the process has exited -- is there a better way to do this?
+      boolean hasexited = false;
+      try {
+        // exitValue throws an exception if process hasn't exited
+        this.subprocess.exitValue();
+        hasexited = true;
+      } catch (IllegalThreadStateException e) {
+      }
+      if (hasexited) {
+        LOG.error("Could not start subprocess");
+        throw new RuntimeException("Could not start subprocess");
+      }
+      this.clientTransceiver
+        = new SaslSocketTransceiver(new InetSocketAddress(outputService.inputPort()));
       this.inputClient =
         SpecificRequestor.getClient(InputProtocol.class, clientTransceiver);
 
@@ -96,15 +112,39 @@ class TetheredProcess  {
     throws IOException, InterruptedException {
     // get the executable command
     List<String> command = new ArrayList<String>();
-    Path[] localFiles = DistributedCache.getLocalCacheFiles(job);
-    if (localFiles == null) {                     // until MAPREDUCE-476
-      URI[] files = DistributedCache.getCacheFiles(job);
-      localFiles = new Path[] { new Path(files[0].toString()) };
+
+    String executable="";
+    if (job.getBoolean(TetherJob.TETHER_EXEC_CACHED,false)){
+      //we want to use the cached executable
+      Path[] localFiles = DistributedCache.getLocalCacheFiles(job);
+      if (localFiles == null) {                     // until MAPREDUCE-476
+        URI[] files = DistributedCache.getCacheFiles(job);
+        localFiles = new Path[] { new Path(files[0].toString()) };
+      }
+      executable=localFiles[0].toString();
+      FileUtil.chmod(executable.toString(), "a+x");
+    }
+    else {
+      executable=job.get(TetherJob.TETHER_EXEC);
     }
-    String executable = localFiles[0].toString();
-    FileUtil.chmod(executable, "a+x");
+
     command.add(executable);
 
+    // Add the executable arguments. We assume the arguments are separated by
+    // spaces so we split the argument string based on spaces and add each
+    // token to command We need to do it this way because
+    // TaskLog.captureOutAndError will put quote marks around each argument so
+    // if we pass a single string containing all arguments we get quoted
+    // incorrectly
+    String args=job.get(TetherJob.TETHER_EXEC_ARGS);
+    String[] aparams=args.split(" ");
+    for (int i=0;i<aparams.length; i++){            
+      aparams[i]=aparams[i].trim();
+      if (aparams[i].length()>0){
+        command.add(aparams[i]);
+      }
+    }
+
     if (System.getProperty("hadoop.log.dir") == null
         && System.getenv("HADOOP_LOG_DIR") != null)
       System.setProperty("hadoop.log.dir", System.getenv("HADOOP_LOG_DIR"));

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java Fri
Jul  1 16:23:39 2011
@@ -50,15 +50,15 @@ import org.apache.avro.specific.Specific
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.DataFileStream;
 
-class WordCountUtil {
+public class WordCountUtil {
 
   private static final File DIR
     = new File(System.getProperty("test.dir", ".") + "/mapred");
-  private static final File LINES_FILE
+  public static final File LINES_FILE
     = new File(new File(DIR, "in"), "lines.avro");
   private static final File LINES_TEXT_FILE
     = new File(new File(DIR, "in"), "lines.txt");
-  private static final File COUNTS_FILE
+  public static final File COUNTS_FILE
     = new File(new File(DIR, "out"), "part-00000.avro");
   private static final File SORTED_FILE
     = new File(new File(DIR, "out"), "part-00000.avro");

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java?rev=1141975&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
(added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
Fri Jul  1 16:23:39 2011
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.Protocol;
+import org.junit.Test;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.WordCountUtil;
+import org.apache.avro.mapred.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.DataFileStream;
+
+public class TestWordCountTether {
+
+
+  @Test
+    @SuppressWarnings("deprecation")
+    public void testJob() throws Exception {
+
+    System.out.println(System.getProperty("java.class.path"));
+    JobConf job = new JobConf();
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out");
+
+    outputPath.getFileSystem(job).delete(outputPath);
+
+    // create the input file
+    WordCountUtil.writeLinesFile();
+
+    java.net.URI exec= new java.net.URI("java");
+
+    //input path
+    String in=dir+"/in";
+
+    //create a string of the arguments
+    String execargs="-classpath " + System.getProperty("java.class.path");
+    execargs+= " org.apache.avro.mapred.tether.WordCountTask";
+
+    FileInputFormat.addInputPaths(job, in);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    TetherJob.setExecutable(job, exec,execargs,false);
+
+    Schema outscheme= new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema();
+    AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
+    job.set(AvroJob.OUTPUT_SCHEMA, outscheme.toString());
+
+    TetherJob.runJob(job);
+
+    // validate the output
+    DatumReader<Pair<Utf8,Long>> reader
+      = new SpecificDatumReader<Pair<Utf8,Long>>();
+    InputStream cin = new BufferedInputStream(new FileInputStream(WordCountUtil.COUNTS_FILE));
+    DataFileStream<Pair<Utf8,Long>> counts
+      = new DataFileStream<Pair<Utf8,Long>>(cin,reader);
+    int numWords = 0;
+    for (Pair<Utf8,Long> wc : counts) {
+      assertEquals(wc.key().toString(),
+                   WordCountUtil.COUNTS.get(wc.key().toString()), wc.value());
+      numWords++;
+    }
+
+    cin.close();
+    assertEquals(WordCountUtil.COUNTS.size(), numWords);
+
+  }
+
+
+}

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
(original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
Fri Jul  1 16:23:39 2011
@@ -50,7 +50,7 @@ public class WordCountTask
     sum = 0;
   }
 
-  public static void main(String... args) throws Exception {
+  public static void main(String[] args) throws Exception {
     new TetherTaskRunner(new WordCountTask()).join();
   }
 



Mime
View raw message