hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r591226 - in /lucene/hadoop/trunk: CHANGES.txt src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
Date Fri, 02 Nov 2007 04:52:40 GMT
Author: ddas
Date: Thu Nov  1 21:52:33 2007
New Revision: 591226

URL: http://svn.apache.org/viewvc?rev=591226&view=rev
Log:
HADOOP-2089.  Fixes the command line argument handling to handle multiple -cacheArchive in
Hadoop streaming. Contributed by Lohit Vijayarenu.

Added:
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=591226&r1=591225&r2=591226&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Nov  1 21:52:33 2007
@@ -63,6 +63,10 @@
     HADOOP-1622 i.e. http://svn.apache.org/viewvc?view=rev&revision=588771.
     (acmurthy)
 
+    HADOOP-2089.  Fixes the command line argument handling to handle multiple
+    -cacheArchive in Hadoop streaming.  (Lohit Vijayarenu via ddas)
+
+
 Release 0.15.1 -
 
   BUG FIXES

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=591226&r1=591225&r2=591226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Thu Nov  1 21:52:33 2007
@@ -404,7 +404,7 @@
     Option cacheFile = createOption("cacheFile", 
                                     "File name URI", "fileNameURI", Integer.MAX_VALUE, false);
     Option cacheArchive = createOption("cacheArchive", 
-                                       "File name URI", "fileNameURI", 1, false);
+                                       "File name URI", "fileNameURI", Integer.MAX_VALUE,
false);
     
     // boolean properties
     

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java?rev=591226&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
(added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
Thu Nov  1 21:52:33 2007
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.DataOutputStream;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.util.zip.ZipEntry;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+
+/**
+ * This class tests cacheArchive option of streaming 
+ * The test case creates 2 archive files, ships it with hadoop
+ * streaming and compares the output with expected output
+ */
+public class TestMultipleArchiveFiles extends TestStreaming
+{
+
+  private StreamJob job;
+  private String INPUT_FILE = "input.txt";
+  private String CACHE_ARCHIVE_1 = "cacheArchive1.zip";
+  private File CACHE_FILE_1 = null;
+  private String CACHE_ARCHIVE_2 = "cacheArchive2.zip";
+  private File CACHE_FILE_2 = null;
+  private String expectedOutput = null;
+  private String OUTPUT_DIR = "out";
+  private Configuration conf = null;
+  private MiniDFSCluster dfs = null;
+  private MiniMRCluster mr = null;
+  private FileSystem fileSys = null;
+  private String strJobTracker = null;
+  private String strNamenode = null;
+  private String namenode = null;
+
+  public TestMultipleArchiveFiles() throws IOException {
+    CACHE_FILE_1 = new File("cacheArchive1");
+    CACHE_FILE_2 = new File("cacheArchive2");
+    input = "HADOOP";
+    expectedOutput = "HADOOP\t\nHADOOP\t\n";
+    try {
+      conf = new Configuration();      
+      dfs = new MiniDFSCluster(conf, 1, true, null);      
+      fileSys = dfs.getFileSystem();
+      namenode = fileSys.getUri().getAuthority();
+      mr  = new MiniMRCluster(1, namenode, 3);
+      strJobTracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();
+      strNamenode = "fs.default.name=" + namenode;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+  
+  protected void createInput() throws IOException
+  {
+
+    DataOutputStream dos = fileSys.create(new Path(INPUT_FILE));
+    String inputFileString = "symlink1/cacheArchive1\nsymlink2/cacheArchive2";
+    dos.write(inputFileString.getBytes("UTF-8"));
+    dos.close();
+    
+    DataOutputStream out = fileSys.create(new Path(CACHE_ARCHIVE_1.toString()));
+    ZipOutputStream zos = new ZipOutputStream(out);
+    ZipEntry ze = new ZipEntry(CACHE_FILE_1.toString());
+    zos.putNextEntry(ze);
+    zos.write(input.getBytes("UTF-8"));
+    zos.closeEntry();
+    zos.close();
+
+    out = fileSys.create(new Path(CACHE_ARCHIVE_2.toString()));
+    zos = new ZipOutputStream(out);
+    ze = new ZipEntry(CACHE_FILE_2.toString());
+    zos.putNextEntry(ze);
+    zos.write(input.getBytes("UTF-8"));
+    zos.closeEntry();
+    zos.close();
+  }
+
+  protected String[] genArgs() {
+    String cacheArchiveString1 = null;
+    String cacheArchiveString2 = null;
+    try {
+      cacheArchiveString1 = fileSys.getUri().toString()+fileSys.getWorkingDirectory().toString()+"/"+CACHE_ARCHIVE_1+"#symlink1";
+      cacheArchiveString2 = fileSys.getUri().toString()+fileSys.getWorkingDirectory().toString()+"/"+CACHE_ARCHIVE_2+"#symlink2";
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    return new String[] {
+      "-input", INPUT_FILE.toString(),
+      "-output", OUTPUT_DIR,
+      "-mapper", "xargs cat", 
+      "-reducer", "cat",
+      "-jobconf", "mapred.reduce.tasks=1",
+      "-cacheArchive", cacheArchiveString1, 
+      "-cacheArchive", cacheArchiveString2,
+      "-jobconf", strNamenode,
+      "-jobconf", strJobTracker,
+      "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
+    };
+  }
+
+  public void testCommandLine() {
+    try {
+      createInput();
+      job = new StreamJob(genArgs(), true);
+      if(job.go() != 0) {
+        throw new Exception("Job Failed");
+      }
+      StringBuffer output = new StringBuffer(256);
+      Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
+      for (int i = 0; i < fileList.length; i++){
+        BufferedReader bread =
+          new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
+        output.append(bread.readLine());
+        output.append("\n");
+        output.append(bread.readLine());
+        output.append("\n");
+      }
+      assertEquals(expectedOutput, output.toString());
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      CACHE_FILE_1.delete();
+      CACHE_FILE_2.delete();
+    }
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestMultipleArchiveFiles().testCommandLine();
+  }
+}



Mime
View raw message