hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r529410 [13/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/...
Date Mon, 16 Apr 2007 21:44:46 GMT
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/TwoDArrayWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/TwoDArrayWritable.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/TwoDArrayWritable.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/TwoDArrayWritable.java Mon Apr 16 14:44:35 2007
@@ -36,16 +36,16 @@
   }
 
   public Object toArray() {
-      int dimensions[] = {values.length, 0};
-      Object result = Array.newInstance(valueClass, dimensions);
-      for (int i = 0; i < values.length; i++) {
-          Object resultRow = Array.newInstance(valueClass, values[i].length);
-          Array.set(result, i, resultRow);
-          for (int j = 0; j < values[i].length; j++) {
-              Array.set(resultRow, j, values[i][j]);
-          }
+    int dimensions[] = {values.length, 0};
+    Object result = Array.newInstance(valueClass, dimensions);
+    for (int i = 0; i < values.length; i++) {
+      Object resultRow = Array.newInstance(valueClass, values[i].length);
+      Array.set(result, i, resultRow);
+      for (int j = 0; j < values[i].length; j++) {
+        Array.set(resultRow, j, values[i][j]);
       }
-      return result;
+    }
+    return result;
   }
 
   public void set(Writable[][] values) { this.values = values; }
@@ -56,35 +56,35 @@
     // construct matrix
     values = new Writable[in.readInt()][];          
     for (int i = 0; i < values.length; i++) {
-        values[i] = new Writable[in.readInt()];
+      values[i] = new Writable[in.readInt()];
     }
 
     // construct values
     for (int i = 0; i < values.length; i++) {
-        for (int j = 0; j < values[i].length; j++) {
-            Writable value;                             // construct value
-            try {
-                value = (Writable)valueClass.newInstance();
-            } catch (InstantiationException e) {
-                throw new RuntimeException(e.toString());
-            } catch (IllegalAccessException e) {
-                throw new RuntimeException(e.toString());
-            }
-            value.readFields(in);                       // read a value
-            values[i][j] = value;                       // store it in values
+      for (int j = 0; j < values[i].length; j++) {
+        Writable value;                             // construct value
+        try {
+          value = (Writable)valueClass.newInstance();
+        } catch (InstantiationException e) {
+          throw new RuntimeException(e.toString());
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e.toString());
         }
+        value.readFields(in);                       // read a value
+        values[i][j] = value;                       // store it in values
+      }
     }
   }
 
   public void write(DataOutput out) throws IOException {
     out.writeInt(values.length);                 // write values
     for (int i = 0; i < values.length; i++) {
-        out.writeInt(values[i].length);
+      out.writeInt(values[i].length);
     }
     for (int i = 0; i < values.length; i++) {
-        for (int j = 0; j < values[i].length; j++) {
-            values[i][j].write(out);
-        }
+      for (int j = 0; j < values[i].length; j++) {
+        values[i][j].write(out);
+      }
     }
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/UTF8.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/UTF8.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/UTF8.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/UTF8.java Mon Apr 16 14:44:35 2007
@@ -70,7 +70,7 @@
   public void set(String string) {
     if (string.length() > 0xffff/3) {             // maybe too long
       LOG.warn("truncating long string: " + string.length()
-                  + " chars, starting with " + string.substring(0, 20));
+               + " chars, starting with " + string.substring(0, 20));
       string = string.substring(0, 0xffff/3);
     }
 
@@ -235,7 +235,7 @@
   public static int writeString(DataOutput out, String s) throws IOException {
     if (s.length() > 0xffff/3) {         // maybe too long
       LOG.warn("truncating long string: " + s.length()
-                  + " chars, starting with " + s.substring(0, 20));
+               + " chars, starting with " + s.substring(0, 20));
       s = s.substring(0, 0xffff/3);
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java Mon Apr 16 14:44:35 2007
@@ -36,14 +36,14 @@
     GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
     byte[] outbuf = new byte[length];
     ByteArrayOutputStream bos =  new ByteArrayOutputStream();
-     int len;
-     while((len=gzi.read(outbuf,0,outbuf.length)) != -1){
-       bos.write(outbuf,0,len);
-     }
-     byte[] decompressed =  bos.toByteArray();
-     bos.close();
-     gzi.close();
-     return decompressed;
+    int len;
+    while((len=gzi.read(outbuf,0,outbuf.length)) != -1){
+      bos.write(outbuf,0,len);
+    }
+    byte[] decompressed =  bos.toByteArray();
+    bos.close();
+    gzi.close();
+    return decompressed;
   }
 
   public static void skipCompressedByteArray(DataInput in) throws IOException {
@@ -61,7 +61,7 @@
       int len = buffer.length;
       out.writeInt(len);
       out.write(buffer,0,len);
-    /* debug only! Once we have confidence, can lose this. */
+      /* debug only! Once we have confidence, can lose this. */
       return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
     } else {
       out.writeInt(-1);
@@ -212,10 +212,10 @@
    * Allocate a buffer for each thread that tries to clone objects.
    */
   private static ThreadLocal cloneBuffers = new ThreadLocal() {
-    protected synchronized Object initialValue() {
-      return new CopyInCopyOutBuffer();
-    }
-  };
+      protected synchronized Object initialValue() {
+        return new CopyInCopyOutBuffer();
+      }
+    };
   
   /**
    * Make a copy of a writable object using serialization to a buffer.
@@ -253,7 +253,7 @@
    * @throws java.io.IOException 
    */
   public static void writeVInt(DataOutput stream, int i) throws IOException {
-      writeVLong(stream, i);
+    writeVLong(stream, i);
   }
   
   /**
@@ -272,32 +272,32 @@
    * @throws java.io.IOException 
    */
   public static void writeVLong(DataOutput stream, long i) throws IOException {
-      if (i >= -112 && i <= 127) {
-          stream.writeByte((byte)i);
-          return;
-      }
+    if (i >= -112 && i <= 127) {
+      stream.writeByte((byte)i);
+      return;
+    }
       
-      int len = -112;
-      if (i < 0) {
-          i ^= -1L; // take one's complement'
-          len = -120;
-      }
+    int len = -112;
+    if (i < 0) {
+      i ^= -1L; // take one's complement'
+      len = -120;
+    }
       
-      long tmp = i;
-      while (tmp != 0) {
-          tmp = tmp >> 8;
-          len--;
-      }
+    long tmp = i;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
       
-      stream.writeByte((byte)len);
+    stream.writeByte((byte)len);
       
-      len = (len < -120) ? -(len + 120) : -(len + 112);
+    len = (len < -120) ? -(len + 120) : -(len + 112);
       
-      for (int idx = len; idx != 0; idx--) {
-          int shiftbits = (idx - 1) * 8;
-          long mask = 0xFFL << shiftbits;
-          stream.writeByte((byte)((i & mask) >> shiftbits));
-      }
+    for (int idx = len; idx != 0; idx--) {
+      int shiftbits = (idx - 1) * 8;
+      long mask = 0xFFL << shiftbits;
+      stream.writeByte((byte)((i & mask) >> shiftbits));
+    }
   }
   
 
@@ -308,19 +308,19 @@
    * @return deserialized long from stream.
    */
   public static long readVLong(DataInput stream) throws IOException {
-      int len = stream.readByte();
-      if (len >= -112) {
-          return len;
-      }
-      boolean isNegative = (len < -120);
-      len = isNegative ? -(len + 120) : -(len + 112);
-      long i = 0;
-      for (int idx = 0; idx < len; idx++) {
-          byte b = stream.readByte();
-          i = i << 8;
-          i = i | (b & 0xFF);
-      }
-      return (isNegative ? (i ^ -1L) : i);
+    int len = stream.readByte();
+    if (len >= -112) {
+      return len;
+    }
+    boolean isNegative = (len < -120);
+    len = isNegative ? -(len + 120) : -(len + 112);
+    long i = 0;
+    for (int idx = 0; idx < len; idx++) {
+      byte b = stream.readByte();
+      i = i << 8;
+      i = i | (b & 0xFF);
+    }
+    return (isNegative ? (i ^ -1L) : i);
   }
 
   /**
@@ -330,7 +330,7 @@
    * @return deserialized integer from stream.
    */
   public static int readVInt(DataInput stream) throws IOException {
-      return (int) readVLong(stream);
+    return (int) readVLong(stream);
   }
   
 
@@ -339,25 +339,25 @@
    * @return the encoded length 
    */
   public static int getVIntSize(long i) {
-      if (i >= -112 && i <= 127) {
-          return 1;
-      }
+    if (i >= -112 && i <= 127) {
+      return 1;
+    }
       
-      int len = -112;
-      if (i < 0) {
-          i ^= -1L; // take one's complement'
-          len = -120;
-      }
+    int len = -112;
+    if (i < 0) {
+      i ^= -1L; // take one's complement'
+      len = -120;
+    }
       
-      long tmp = i;
-      while (tmp != 0) {
-          tmp = tmp >> 8;
-          len--;
-      }
+    long tmp = i;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
       
-      len = (len < -120) ? -(len + 120) : -(len + 112);
+    len = (len < -120) ? -(len + 120) : -(len + 112);
       
-      return len+1;
+    return len+1;
   }
   /**
    * Read an Enum value from DataInput, Enums are read and written 
@@ -379,7 +379,7 @@
    * @throws IOException
    */
   public static void writeEnum(DataOutput out,  Enum enumVal) 
-  throws IOException{
+    throws IOException{
     Text.writeString(out, enumVal.name()); 
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java Mon Apr 16 14:44:35 2007
@@ -34,7 +34,7 @@
    * @return a stream the user can write uncompressed data to
    */
   CompressionOutputStream createOutputStream(OutputStream out) 
-  throws IOException;
+    throws IOException;
   
   /**
    * Create a stream decompressor that will read from the given input stream.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java Mon Apr 16 14:44:35 2007
@@ -95,7 +95,7 @@
   
     int n = in.read(buffer, 0, buffer.length);
     if (n == -1) {
-        throw new EOFException("Unexpected end of input stream");
+      throw new EOFException("Unexpected end of input stream");
     }
 
     decompressor.setInput(buffer, 0, n);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java Mon Apr 16 14:44:35 2007
@@ -44,9 +44,9 @@
    * @return a stream the user can write uncompressed data to
    */
   public CompressionOutputStream createOutputStream(OutputStream out) 
-  throws IOException {
+    throws IOException {
     return new CompressorStream(out, ZlibFactory.getZlibCompressor(), 
-        conf.getInt("io.file.buffer.size", 4*1024));
+                                conf.getInt("io.file.buffer.size", 4*1024));
   }
   
   /**
@@ -55,9 +55,9 @@
    * @return a stream to read uncompressed bytes from
    */
   public CompressionInputStream createInputStream(InputStream in) 
-  throws IOException {
+    throws IOException {
     return new DecompressorStream(in, ZlibFactory.getZlibDecompressor(),
-        conf.getInt("io.file.buffer.size", 4*1024));
+                                  conf.getInt("io.file.buffer.size", 4*1024));
   }
   
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Mon Apr 16 14:44:35 2007
@@ -73,7 +73,7 @@
     }
     
     public void write(byte[] data, int offset, int length) 
-    throws IOException {
+      throws IOException {
       out.write(data, offset, length);
     }
     
@@ -141,18 +141,18 @@
    * @return a stream the user can write uncompressed data to
    */
   public CompressionOutputStream createOutputStream(OutputStream out) 
-  throws IOException {
+    throws IOException {
     CompressionOutputStream compOutStream = null;
     
     if (ZlibFactory.isNativeZlibLoaded()) {
       Compressor compressor = 
         new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
-            ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
-            ZlibCompressor.CompressionHeader.GZIP_FORMAT,
-            64*1024); 
+                           ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
+                           ZlibCompressor.CompressionHeader.GZIP_FORMAT,
+                           64*1024); 
      
       compOutStream = new CompressorStream(out, compressor,
-                        conf.getInt("io.file.buffer.size", 4*1024)); 
+                                           conf.getInt("io.file.buffer.size", 4*1024)); 
     } else {
       compOutStream = new GzipOutputStream(out);
     }
@@ -166,16 +166,16 @@
    * @return a stream to read uncompressed bytes from
    */
   public CompressionInputStream createInputStream(InputStream in) 
-  throws IOException {
+    throws IOException {
     CompressionInputStream compInStream = null;
     
     if (ZlibFactory.isNativeZlibLoaded()) {
       Decompressor decompressor =
         new ZlibDecompressor(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB,
-            64*1-24);
+                             64*1-24);
 
       compInStream = new DecompressorStream(in, decompressor,
-                        conf.getInt("io.file.buffer.size", 4*1024)); 
+                                            conf.getInt("io.file.buffer.size", 4*1024)); 
     } else {
       compInStream = new GzipInputStream(in);
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java Mon Apr 16 14:44:35 2007
@@ -44,7 +44,7 @@
   }
 
   public synchronized int compress(byte[] b, int off, int len) 
-  throws IOException {
+    throws IOException {
     return super.deflate(b, off, len);
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java Mon Apr 16 14:44:35 2007
@@ -41,7 +41,7 @@
   }
 
   public synchronized int decompress(byte[] b, int off, int len) 
-  throws IOException {
+    throws IOException {
     try {
       return super.inflate(b, off, len);
     } catch (DataFormatException dfe) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java Mon Apr 16 14:44:35 2007
@@ -39,7 +39,7 @@
   static {
     if (NativeCodeLoader.isNativeCodeLoaded()) {
       nativeZlibLoaded = ZlibCompressor.isNativeZlibLoaded() &&
-                          ZlibDecompressor.isNativeZlibLoaded();
+        ZlibDecompressor.isNativeZlibLoaded();
       
       if (nativeZlibLoaded) {
         LOG.info("Successfully loaded & initialized native-zlib library");
@@ -66,7 +66,7 @@
    */
   public static Compressor getZlibCompressor() {
     return (nativeZlibLoaded) ? 
-        new ZlibCompressor() : new BuiltInZlibDeflater(); 
+      new ZlibCompressor() : new BuiltInZlibDeflater(); 
   }
 
   /**
@@ -76,7 +76,7 @@
    */
   public static Decompressor getZlibDecompressor() {
     return (nativeZlibLoaded) ? 
-        new ZlibDecompressor() : new BuiltInZlibInflater(); 
+      new ZlibDecompressor() : new BuiltInZlibInflater(); 
   }
   
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Mon Apr 16 14:44:35 2007
@@ -47,7 +47,7 @@
   }
 
   public Object invoke(Object proxy, Method method, Object[] args)
-      throws Throwable {
+    throws Throwable {
     RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
     if (policy == null) {
       policy = defaultPolicy;
@@ -60,16 +60,16 @@
       } catch (Exception e) {
         if (!policy.shouldRetry(e, retries++)) {
           LOG.warn("Exception while invoking " + method.getName()
-              + " of " + implementation.getClass() + ". Not retrying."
-              + StringUtils.stringifyException(e));
+                   + " of " + implementation.getClass() + ". Not retrying."
+                   + StringUtils.stringifyException(e));
           if (!method.getReturnType().equals(Void.TYPE)) {
             throw e; // non-void methods can't fail without an exception
           }
           return null;
         }
         LOG.warn("Exception while invoking " + method.getName()
-            + " of " + implementation.getClass() + ". Retrying."
-            + StringUtils.stringifyException(e));
+                 + " of " + implementation.getClass() + ". Retrying."
+                 + StringUtils.stringifyException(e));
       }
     }
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryProxy.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryProxy.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryProxy.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryProxy.java Mon Apr 16 14:44:35 2007
@@ -38,12 +38,12 @@
    * @return the retry proxy
    */
   public static Object create(Class<?> iface, Object implementation,
-      RetryPolicy retryPolicy) {
+                              RetryPolicy retryPolicy) {
     return Proxy.newProxyInstance(
-        implementation.getClass().getClassLoader(),
-        new Class<?>[] { iface },
-        new RetryInvocationHandler(implementation, retryPolicy)
-    );
+                                  implementation.getClass().getClassLoader(),
+                                  new Class<?>[] { iface },
+                                  new RetryInvocationHandler(implementation, retryPolicy)
+                                  );
   }  
   
   /**
@@ -59,11 +59,11 @@
    * @return the retry proxy
    */
   public static Object create(Class<?> iface, Object implementation,
-      Map<String,RetryPolicy> methodNameToPolicyMap) {
+                              Map<String,RetryPolicy> methodNameToPolicyMap) {
     return Proxy.newProxyInstance(
-        implementation.getClass().getClassLoader(),
-        new Class<?>[] { iface },
-        new RetryInvocationHandler(implementation, methodNameToPolicyMap)
-    );
+                                  implementation.getClass().getClassLoader(),
+                                  new Class<?>[] { iface },
+                                  new RetryInvocationHandler(implementation, methodNameToPolicyMap)
+                                  );
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java Mon Apr 16 14:44:35 2007
@@ -33,115 +33,115 @@
  */
 class SocketChannelOutputStream extends OutputStream {    
     
-    ByteBuffer buffer;
-    ByteBuffer flush;
-    SocketChannel channel;
-    Selector selector;
+  ByteBuffer buffer;
+  ByteBuffer flush;
+  SocketChannel channel;
+  Selector selector;
     
-    /* ------------------------------------------------------------------------------- */
-    /** Constructor.
-     * 
-     */
-    public SocketChannelOutputStream(SocketChannel channel)
-    {
-        this.channel = channel;
-        buffer = ByteBuffer.allocate(8); // only for small writes
-    }
-
-    /* ------------------------------------------------------------------------------- */
-    /*
-     * @see java.io.OutputStream#write(int)
-     */
-    public void write(int b) throws IOException
-    {
-        buffer.clear();
-        buffer.put((byte)b);
-        buffer.flip();
-        flush = buffer;
-        flushBuffer();
-    }
+  /* ------------------------------------------------------------------------------- */
+  /** Constructor.
+   * 
+   */
+  public SocketChannelOutputStream(SocketChannel channel)
+  {
+    this.channel = channel;
+    buffer = ByteBuffer.allocate(8); // only for small writes
+  }
+
+  /* ------------------------------------------------------------------------------- */
+  /*
+   * @see java.io.OutputStream#write(int)
+   */
+  public void write(int b) throws IOException
+  {
+    buffer.clear();
+    buffer.put((byte)b);
+    buffer.flip();
+    flush = buffer;
+    flushBuffer();
+  }
 
     
-    /* ------------------------------------------------------------------------------- */
-    /*
-     * @see java.io.OutputStream#close()
-     */
-    public void close() throws IOException
-    {
-        channel.close();
-    }
-
-    /* ------------------------------------------------------------------------------- */
-    /*
-     * @see java.io.OutputStream#flush()
-     */
-    public void flush() throws IOException
-    {
-    }
-
-    /* ------------------------------------------------------------------------------- */
-    /*
-     * @see java.io.OutputStream#write(byte[], int, int)
-     */
-    public void write(byte[] buf, int offset, int length) throws IOException
-    {
-        flush = ByteBuffer.wrap(buf,offset,length);
-        flushBuffer();
-    }
-
-    /* ------------------------------------------------------------------------------- */
-    /*
-     * @see java.io.OutputStream#write(byte[])
-     */
-    public void write(byte[] buf) throws IOException
-    {
-        flush = ByteBuffer.wrap(buf);
-        flushBuffer();
-    }
-
-
-    /* ------------------------------------------------------------------------------- */
-    private void flushBuffer() throws IOException
-    {
-        while (flush.hasRemaining())
-        {
-            int len = channel.write(flush);
+  /* ------------------------------------------------------------------------------- */
+  /*
+   * @see java.io.OutputStream#close()
+   */
+  public void close() throws IOException
+  {
+    channel.close();
+  }
+
+  /* ------------------------------------------------------------------------------- */
+  /*
+   * @see java.io.OutputStream#flush()
+   */
+  public void flush() throws IOException
+  {
+  }
+
+  /* ------------------------------------------------------------------------------- */
+  /*
+   * @see java.io.OutputStream#write(byte[], int, int)
+   */
+  public void write(byte[] buf, int offset, int length) throws IOException
+  {
+    flush = ByteBuffer.wrap(buf,offset,length);
+    flushBuffer();
+  }
+
+  /* ------------------------------------------------------------------------------- */
+  /*
+   * @see java.io.OutputStream#write(byte[])
+   */
+  public void write(byte[] buf) throws IOException
+  {
+    flush = ByteBuffer.wrap(buf);
+    flushBuffer();
+  }
+
+
+  /* ------------------------------------------------------------------------------- */
+  private void flushBuffer() throws IOException
+  {
+    while (flush.hasRemaining())
+      {
+        int len = channel.write(flush);
+        if (len < 0)
+          throw new IOException("EOF");
+        if (len == 0)
+          {
+            // write channel full.  Try letting other threads have a go.
+            Thread.yield();
+            len = channel.write(flush);
             if (len < 0)
-                throw new IOException("EOF");
+              throw new IOException("EOF");
             if (len == 0)
-            {
-                // write channel full.  Try letting other threads have a go.
-                Thread.yield();
-                len = channel.write(flush);
-                if (len < 0)
-                    throw new IOException("EOF");
-                if (len == 0)
-                {
-                    // still full.  need to  block until it is writable.
-                    if (selector==null)
-                     {
-                            selector = Selector.open();
-                            channel.register(selector, SelectionKey.OP_WRITE);
-                     }
-
-                     selector.select();
-                }
-            }
-        }
+              {
+                // still full.  need to  block until it is writable.
+                if (selector==null)
+                  {
+                    selector = Selector.open();
+                    channel.register(selector, SelectionKey.OP_WRITE);
+                  }
+
+                selector.select();
+              }
+          }
+      }
+    flush = null;
+  }
+
+  /* ------------------------------------------------------------------------------- */
+  public void destroy()
+  {
+    if (selector != null)
+      {
+        try{ selector.close();}
+        catch(IOException e){}
+        selector = null;
+        buffer = null;
         flush = null;
-    }
-
-    /* ------------------------------------------------------------------------------- */
-    public void destroy()
-    {
-        if (selector != null)
-        {
-            try{ selector.close();}
-            catch(IOException e){}
-            selector = null;
-            buffer = null;
-            flush = null;
-            channel = null;
-        }
-    }
+        channel = null;
+      }
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java Mon Apr 16 14:44:35 2007
@@ -144,7 +144,7 @@
       return groupCounters.size();
     }
     
-        /**
+    /**
      * Looks up key in the ResourceBundle and returns the corresponding value.
      * If the bundle or the key doesn't exist, returns the default value.
      */
@@ -167,7 +167,7 @@
   // Map from group name (enum class name) to map of int (enum ordinal) to
   // counter record (name-value pair).
   private Map<String,Map<Integer,CounterRec>> counters =
-          new TreeMap<String,Map<Integer,CounterRec>>();
+    new TreeMap<String,Map<Integer,CounterRec>>();
   
   /**
    * Returns the names of all counter classes.
@@ -287,11 +287,11 @@
    * Convenience method for computing the sum of two sets of counters.
    */
   public static Counters sum(Counters a, Counters b) {
-      Counters counters = new Counters();
-      counters.incrAllCounters(a);
-      counters.incrAllCounters(b);
-      return counters;
-    }
+    Counters counters = new Counters();
+    counters.incrAllCounters(a);
+    counters.incrAllCounters(b);
+    return counters;
+  }
   
   /**
    * Returns the total number of counters, by summing the number of counters

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Mon Apr 16 14:44:35 2007
@@ -62,8 +62,8 @@
    *         fresh instructions.
    */
   HeartbeatResponse heartbeat(TaskTrackerStatus status, 
-          boolean initialContact, boolean acceptNewTasks, short responseId)
-  throws IOException;
+                              boolean initialContact, boolean acceptNewTasks, short responseId)
+    throws IOException;
 
   /**
    * The task tracker calls this once, to discern where it can find
@@ -92,7 +92,7 @@
    * @throws IOException
    */
   TaskCompletionEvent[] getTaskCompletionEvents(
-      String jobid, int fromEventId, int maxEvents) throws IOException;
+                                                String jobid, int fromEventId, int maxEvents) throws IOException;
   
 }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Apr 16 14:44:35 2007
@@ -38,766 +38,766 @@
  * @author Mike Cafarella
  *******************************************************/
 public class JobClient extends ToolBase implements MRConstants  {
-    private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
-    public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL }
-    private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
-
-    static long MAX_JOBPROFILE_AGE = 1000 * 2;
-
-    /**
-     * A NetworkedJob is an implementation of RunningJob.  It holds
-     * a JobProfile object to provide some info, and interacts with the
-     * remote service to provide certain functionality.
-     */
-    class NetworkedJob implements RunningJob {
-        JobProfile profile;
-        JobStatus status;
-        long statustime;
-
-        /**
-         * We store a JobProfile and a timestamp for when we last
-         * acquired the job profile.  If the job is null, then we cannot
-         * perform any of the tasks.  The job might be null if the JobTracker
-         * has completely forgotten about the job.  (eg, 24 hours after the
-         * job completes.)
-         */
-        public NetworkedJob(JobStatus job) throws IOException {
-            this.status = job;
-            this.profile = jobSubmitClient.getJobProfile(job.getJobId());
-            this.statustime = System.currentTimeMillis();
-        }
-
-        /**
-         * Some methods rely on having a recent job profile object.  Refresh
-         * it, if necessary
-         */
-        synchronized void ensureFreshStatus() throws IOException {
-            if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
-                this.status = jobSubmitClient.getJobStatus(profile.getJobId());
-                this.statustime = System.currentTimeMillis();
-            }
-        }
-
-        /**
-         * An identifier for the job
-         */
-        public String getJobID() {
-            return profile.getJobId();
-        }
-
-        /**
-         * The name of the job file
-         */
-        public String getJobFile() {
-            return profile.getJobFile();
-        }
-
-        /**
-         * A URL where the job's status can be seen
-         */
-        public String getTrackingURL() {
-            return profile.getURL().toString();
-        }
-
-        /**
-         * A float between 0.0 and 1.0, indicating the % of map work
-         * completed.
-         */
-        public float mapProgress() throws IOException {
-            ensureFreshStatus();
-            return status.mapProgress();
-        }
-
-        /**
-         * A float between 0.0 and 1.0, indicating the % of reduce work
-         * completed.
-         */
-        public float reduceProgress() throws IOException {
-            ensureFreshStatus();
-            return status.reduceProgress();
-        }
+  private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
+  public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL }
+  private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
 
-        /**
-         * Returns immediately whether the whole job is done yet or not.
-         */
-        public synchronized boolean isComplete() throws IOException {
-            ensureFreshStatus();
-            return (status.getRunState() == JobStatus.SUCCEEDED ||
-                    status.getRunState() == JobStatus.FAILED);
-        }
+  static long MAX_JOBPROFILE_AGE = 1000 * 2;
 
-        /**
-         * True iff job completed successfully.
-         */
-        public synchronized boolean isSuccessful() throws IOException {
-            ensureFreshStatus();
-            return status.getRunState() == JobStatus.SUCCEEDED;
-        }
+  /**
+   * A NetworkedJob is an implementation of RunningJob.  It holds
+   * a JobProfile object to provide some info, and interacts with the
+   * remote service to provide certain functionality.
+   */
+  class NetworkedJob implements RunningJob {
+    JobProfile profile;
+    JobStatus status;
+    long statustime;
 
-        /**
-         * Blocks until the job is finished
-         */
-        public synchronized void waitForCompletion() throws IOException {
-            while (! isComplete()) {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException ie) {
-                }
-            }
-        }
+    /**
+     * We store a JobProfile and a timestamp for when we last
+     * acquired the job profile.  If the job is null, then we cannot
+     * perform any of the tasks.  The job might be null if the JobTracker
+     * has completely forgotten about the job.  (eg, 24 hours after the
+     * job completes.)
+     */
+    public NetworkedJob(JobStatus job) throws IOException {
+      this.status = job;
+      this.profile = jobSubmitClient.getJobProfile(job.getJobId());
+      this.statustime = System.currentTimeMillis();
+    }
 
-        /**
-         * Tells the service to terminate the current job.
-         */
-        public synchronized void killJob() throws IOException {
-            jobSubmitClient.killJob(getJobID());
-        }
-        /**
-         * Fetch task completion events from jobtracker for this job. 
-         */
-        public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
-            int startFrom) throws IOException{
-          return jobSubmitClient.getTaskCompletionEvents(
-              getJobID(), startFrom, 10); 
-        }
+    /**
+     * Some methods rely on having a recent job profile object.  Refresh
+     * it, if necessary
+     */
+    synchronized void ensureFreshStatus() throws IOException {
+      if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
+        this.status = jobSubmitClient.getJobStatus(profile.getJobId());
+        this.statustime = System.currentTimeMillis();
+      }
+    }
 
-        /**
-         * Dump stats to screen
-         */
-        public String toString() {
-            try {
-                ensureFreshStatus();
-            } catch (IOException e) {
-            }
-            return "Job: " + profile.getJobId() + "\n" + 
-                "file: " + profile.getJobFile() + "\n" + 
-                "tracking URL: " + profile.getURL() + "\n" + 
-                "map() completion: " + status.mapProgress() + "\n" + 
-                "reduce() completion: " + status.reduceProgress();
-        }
-        
-        /**
-         * Returns the counters for this job
-         */
-        public Counters getCounters() throws IOException {
-          return jobSubmitClient.getJobCounters(getJobID());
-        }
+    /**
+     * An identifier for the job
+     */
+    public String getJobID() {
+      return profile.getJobId();
     }
 
-    JobSubmissionProtocol jobSubmitClient;
-    FileSystem fs = null;
+    /**
+     * The name of the job file
+     */
+    public String getJobFile() {
+      return profile.getJobFile();
+    }
 
-    static Random r = new Random();
+    /**
+     * A URL where the job's status can be seen
+     */
+    public String getTrackingURL() {
+      return profile.getURL().toString();
+    }
 
     /**
-     * Build a job client, connect to the default job tracker
+     * A float between 0.0 and 1.0, indicating the % of map work
+     * completed.
      */
-    public JobClient() {
+    public float mapProgress() throws IOException {
+      ensureFreshStatus();
+      return status.mapProgress();
     }
-    
-    public JobClient(Configuration conf) throws IOException {
-        setConf(conf);
-        init();
+
+    /**
+     * A float between 0.0 and 1.0, indicating the % of reduce work
+     * completed.
+     */
+    public float reduceProgress() throws IOException {
+      ensureFreshStatus();
+      return status.reduceProgress();
     }
-    
-    public void init() throws IOException {
-        String tracker = conf.get("mapred.job.tracker", "local");
-        if ("local".equals(tracker)) {
-          this.jobSubmitClient = new LocalJobRunner(conf);
-        } else {
-          this.jobSubmitClient = (JobSubmissionProtocol) 
-            RPC.getProxy(JobSubmissionProtocol.class,
-                         JobSubmissionProtocol.versionID,
-                         JobTracker.getAddress(conf), conf);
-        }        
+
+    /**
+     * Returns immediately whether the whole job is done yet or not.
+     */
+    public synchronized boolean isComplete() throws IOException {
+      ensureFreshStatus();
+      return (status.getRunState() == JobStatus.SUCCEEDED ||
+              status.getRunState() == JobStatus.FAILED);
     }
-  
+
     /**
-     * Build a job client, connect to the indicated job tracker.
+     * True iff job completed successfully.
      */
-    public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
-        this.jobSubmitClient = (JobSubmissionProtocol) 
-            RPC.getProxy(JobSubmissionProtocol.class,
-                         JobSubmissionProtocol.versionID, jobTrackAddr, conf);
+    public synchronized boolean isSuccessful() throws IOException {
+      ensureFreshStatus();
+      return status.getRunState() == JobStatus.SUCCEEDED;
     }
 
+    /**
+     * Blocks until the job is finished
+     */
+    public synchronized void waitForCompletion() throws IOException {
+      while (! isComplete()) {
+        try {
+          Thread.sleep(5000);
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
 
     /**
+     * Tells the service to terminate the current job.
      */
-    public synchronized void close() throws IOException {
+    public synchronized void killJob() throws IOException {
+      jobSubmitClient.killJob(getJobID());
+    }
+    /**
+     * Fetch task completion events from jobtracker for this job. 
+     */
+    public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+                                                                      int startFrom) throws IOException{
+      return jobSubmitClient.getTaskCompletionEvents(
+                                                     getJobID(), startFrom, 10); 
     }
 
     /**
-     * Get a filesystem handle.  We need this to prepare jobs
-     * for submission to the MapReduce system.
+     * Dump stats to screen
      */
-    public synchronized FileSystem getFs() throws IOException {
-      if (this.fs == null) {
-        String fsName = jobSubmitClient.getFilesystemName();
-        this.fs = FileSystem.getNamed(fsName, this.conf);
+    public String toString() {
+      try {
+        ensureFreshStatus();
+      } catch (IOException e) {
       }
-      return fs;
+      return "Job: " + profile.getJobId() + "\n" + 
+        "file: " + profile.getJobFile() + "\n" + 
+        "tracking URL: " + profile.getURL() + "\n" + 
+        "map() completion: " + status.mapProgress() + "\n" + 
+        "reduce() completion: " + status.reduceProgress();
     }
-
+        
     /**
-     * Submit a job to the MR system
+     * Returns the counters for this job
      */
-    public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
-      InvalidJobConfException,IOException {
-        // Load in the submitted job details
-        JobConf job = new JobConf(jobFile);
-        return submitJob(job);
+    public Counters getCounters() throws IOException {
+      return jobSubmitClient.getJobCounters(getJobID());
     }
+  }
+
+  JobSubmissionProtocol jobSubmitClient;
+  FileSystem fs = null;
+
+  static Random r = new Random();
+
+  /**
+   * Build a job client, connect to the default job tracker
+   */
+  public JobClient() {
+  }
+    
+  public JobClient(Configuration conf) throws IOException {
+    setConf(conf);
+    init();
+  }
+    
+  public void init() throws IOException {
+    String tracker = conf.get("mapred.job.tracker", "local");
+    if ("local".equals(tracker)) {
+      this.jobSubmitClient = new LocalJobRunner(conf);
+    } else {
+      this.jobSubmitClient = (JobSubmissionProtocol) 
+        RPC.getProxy(JobSubmissionProtocol.class,
+                     JobSubmissionProtocol.versionID,
+                     JobTracker.getAddress(conf), conf);
+    }        
+  }
+  
+  /**
+   * Build a job client, connect to the indicated job tracker.
+   */
+  public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
+    this.jobSubmitClient = (JobSubmissionProtocol) 
+      RPC.getProxy(JobSubmissionProtocol.class,
+                   JobSubmissionProtocol.versionID, jobTrackAddr, conf);
+  }
+
+
+  /**
+   */
+  public synchronized void close() throws IOException {
+  }
+
+  /**
+   * Get a filesystem handle.  We need this to prepare jobs
+   * for submission to the MapReduce system.
+   */
+  public synchronized FileSystem getFs() throws IOException {
+    if (this.fs == null) {
+      String fsName = jobSubmitClient.getFilesystemName();
+      this.fs = FileSystem.getNamed(fsName, this.conf);
+    }
+    return fs;
+  }
+
+  /**
+   * Submit a job to the MR system
+   */
+  public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
+                                                     InvalidJobConfException,IOException {
+    // Load in the submitted job details
+    JobConf job = new JobConf(jobFile);
+    return submitJob(job);
+  }
     
    
-    /**
-     * Submit a job to the MR system
-     */
-    public RunningJob submitJob(JobConf job) throws FileNotFoundException, 
-      InvalidJobConfException, IOException {
-        //
-        // First figure out what fs the JobTracker is using.  Copy the
-        // job to it, under a temporary name.  This allows DFS to work,
-        // and under the local fs also provides UNIX-like object loading 
-        // semantics.  (that is, if the job file is deleted right after
-        // submission, we can still run the submission to completion)
-        //
-
-        // Create a number of filenames in the JobTracker's fs namespace
-        Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36));
-        Path submitJobFile = new Path(submitJobDir, "job.xml");
-        Path submitJarFile = new Path(submitJobDir, "job.jar");
-        Path submitSplitFile = new Path(submitJobDir, "job.split");
+  /**
+   * Submit a job to the MR system
+   */
+  public RunningJob submitJob(JobConf job) throws FileNotFoundException, 
+                                                  InvalidJobConfException, IOException {
+    //
+    // First figure out what fs the JobTracker is using.  Copy the
+    // job to it, under a temporary name.  This allows DFS to work,
+    // and under the local fs also provides UNIX-like object loading 
+    // semantics.  (that is, if the job file is deleted right after
+    // submission, we can still run the submission to completion)
+    //
+
+    // Create a number of filenames in the JobTracker's fs namespace
+    Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36));
+    Path submitJobFile = new Path(submitJobDir, "job.xml");
+    Path submitJarFile = new Path(submitJobDir, "job.jar");
+    Path submitSplitFile = new Path(submitJobDir, "job.split");
         
-        FileSystem fs = getFs();
-        LOG.debug("default FileSystem: " + fs.getUri());
-        // try getting the md5 of the archives
-        URI[] tarchives = DistributedCache.getCacheArchives(job);
-        URI[] tfiles = DistributedCache.getCacheFiles(job);
-        if ((tarchives != null) || (tfiles != null)) {
-          // prepare these archives for md5 checksums
-          if (tarchives != null) {
-            String md5Archives = StringUtils.byteToHexString(DistributedCache
-                .createMD5(tarchives[0], job));
-            for (int i = 1; i < tarchives.length; i++) {
-              md5Archives = md5Archives
-                  + ","
-                  + StringUtils.byteToHexString(DistributedCache
-                      .createMD5(tarchives[i], job));
-            }
-            DistributedCache.setArchiveMd5(job, md5Archives);
-            //job.set("mapred.cache.archivemd5", md5Archives);
-          }
-          if (tfiles != null) {
-            String md5Files = StringUtils.byteToHexString(DistributedCache
-                .createMD5(tfiles[0], job));
-            for (int i = 1; i < tfiles.length; i++) {
-              md5Files = md5Files
-                  + ","
-                  + StringUtils.byteToHexString(DistributedCache
-                      .createMD5(tfiles[i], job));
-            }
-            DistributedCache.setFileMd5(job, md5Files);
-            //"mapred.cache.filemd5", md5Files);
-          }
+    FileSystem fs = getFs();
+    LOG.debug("default FileSystem: " + fs.getUri());
+    // try getting the md5 of the archives
+    URI[] tarchives = DistributedCache.getCacheArchives(job);
+    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    if ((tarchives != null) || (tfiles != null)) {
+      // prepare these archives for md5 checksums
+      if (tarchives != null) {
+        String md5Archives = StringUtils.byteToHexString(DistributedCache
+                                                         .createMD5(tarchives[0], job));
+        for (int i = 1; i < tarchives.length; i++) {
+          md5Archives = md5Archives
+            + ","
+            + StringUtils.byteToHexString(DistributedCache
+                                          .createMD5(tarchives[i], job));
+        }
+        DistributedCache.setArchiveMd5(job, md5Archives);
+        //job.set("mapred.cache.archivemd5", md5Archives);
+      }
+      if (tfiles != null) {
+        String md5Files = StringUtils.byteToHexString(DistributedCache
+                                                      .createMD5(tfiles[0], job));
+        for (int i = 1; i < tfiles.length; i++) {
+          md5Files = md5Files
+            + ","
+            + StringUtils.byteToHexString(DistributedCache
+                                          .createMD5(tfiles[i], job));
         }
+        DistributedCache.setFileMd5(job, md5Files);
+        //"mapred.cache.filemd5", md5Files);
+      }
+    }
        
-        String originalJarPath = job.getJar();
-        short replication = (short)job.getInt("mapred.submit.replication", 10);
-
-        if (originalJarPath != null) {           // copy jar to JobTracker's fs
-          // use jar name if job is not named. 
-          if( "".equals(job.getJobName() )){
-            job.setJobName(new Path(originalJarPath).getName());
-          }
-          job.setJar(submitJarFile.toString());
-          fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
-          fs.setReplication(submitJarFile, replication);
-        }
+    String originalJarPath = job.getJar();
+    short replication = (short)job.getInt("mapred.submit.replication", 10);
 
-        // Set the user's name and working directory
-        String user = System.getProperty("user.name");
-        job.setUser(user != null ? user : "Dr Who");
-        if (job.getWorkingDirectory() == null) {
-          job.setWorkingDirectory(fs.getWorkingDirectory());          
-        }
-
-        // Check the input specification 
-        job.getInputFormat().validateInput(job);
-
-        // Check the output specification
-        job.getOutputFormat().checkOutputSpecs(fs, job);
-
-        // Create the splits for the job
-        LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
-        InputSplit[] splits = 
-          job.getInputFormat().getSplits(job, job.getNumMapTasks());
-        // sort the splits into order based on size, so that the biggest
-        // go first
-        Arrays.sort(splits, new Comparator() {
-          public int compare(Object a, Object b) {
-            try {
-              long left = ((InputSplit) a).getLength();
-              long right = ((InputSplit) b).getLength();
-              if (left == right) {
-                return 0;
-              } else if (left < right) {
-                return 1;
-              } else {
-                return -1;
-              }
-            } catch (IOException ie) {
-              throw new RuntimeException("Problem getting input split size",
-                                         ie);
+    if (originalJarPath != null) {           // copy jar to JobTracker's fs
+      // use jar name if job is not named. 
+      if( "".equals(job.getJobName() )){
+        job.setJobName(new Path(originalJarPath).getName());
+      }
+      job.setJar(submitJarFile.toString());
+      fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
+      fs.setReplication(submitJarFile, replication);
+    }
+
+    // Set the user's name and working directory
+    String user = System.getProperty("user.name");
+    job.setUser(user != null ? user : "Dr Who");
+    if (job.getWorkingDirectory() == null) {
+      job.setWorkingDirectory(fs.getWorkingDirectory());          
+    }
+
+    // Check the input specification 
+    job.getInputFormat().validateInput(job);
+
+    // Check the output specification
+    job.getOutputFormat().checkOutputSpecs(fs, job);
+
+    // Create the splits for the job
+    LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
+    InputSplit[] splits = 
+      job.getInputFormat().getSplits(job, job.getNumMapTasks());
+    // sort the splits into order based on size, so that the biggest
+    // go first
+    Arrays.sort(splits, new Comparator() {
+        public int compare(Object a, Object b) {
+          try {
+            long left = ((InputSplit) a).getLength();
+            long right = ((InputSplit) b).getLength();
+            if (left == right) {
+              return 0;
+            } else if (left < right) {
+              return 1;
+            } else {
+              return -1;
             }
+          } catch (IOException ie) {
+            throw new RuntimeException("Problem getting input split size",
+                                       ie);
           }
-        });
-        // write the splits to a file for the job tracker
-        FSDataOutputStream out = fs.create(submitSplitFile);
-        try {
-          writeSplitsFile(splits, out);
-        } finally {
-          out.close();
-        }
-        job.set("mapred.job.split.file", submitSplitFile.toString());
-        job.setNumMapTasks(splits.length);
-        
-        // Write job file to JobTracker's fs        
-        out = fs.create(submitJobFile, replication);
-        try {
-          job.write(out);
-        } finally {
-          out.close();
-        }
-
-        //
-        // Now, actually submit the job (using the submit name)
-        //
-        JobStatus status = jobSubmitClient.submitJob(submitJobFile.toString());
-        if (status != null) {
-            return new NetworkedJob(status);
-        } else {
-            throw new IOException("Could not launch job");
         }
+      });
+    // write the splits to a file for the job tracker
+    FSDataOutputStream out = fs.create(submitSplitFile);
+    try {
+      writeSplitsFile(splits, out);
+    } finally {
+      out.close();
     }
-
-    static class RawSplit implements Writable {
-      private String splitClass;
-      private BytesWritable bytes = new BytesWritable();
-      private String[] locations;
+    job.set("mapred.job.split.file", submitSplitFile.toString());
+    job.setNumMapTasks(splits.length);
+        
+    // Write job file to JobTracker's fs        
+    out = fs.create(submitJobFile, replication);
+    try {
+      job.write(out);
+    } finally {
+      out.close();
+    }
+
+    //
+    // Now, actually submit the job (using the submit name)
+    //
+    JobStatus status = jobSubmitClient.submitJob(submitJobFile.toString());
+    if (status != null) {
+      return new NetworkedJob(status);
+    } else {
+      throw new IOException("Could not launch job");
+    }
+  }
+
+  static class RawSplit implements Writable {
+    private String splitClass;
+    private BytesWritable bytes = new BytesWritable();
+    private String[] locations;
       
-      public void setBytes(byte[] data, int offset, int length) {
-        bytes.set(data, offset, length);
-      }
+    public void setBytes(byte[] data, int offset, int length) {
+      bytes.set(data, offset, length);
+    }
 
-      public void setClassName(String className) {
-        splitClass = className;
-      }
-      
-      public String getClassName() {
-        return splitClass;
-      }
-      
-      public BytesWritable getBytes() {
-        return bytes;
-      }
-      
-      public void setLocations(String[] locations) {
-        this.locations = locations;
-      }
+    public void setClassName(String className) {
+      splitClass = className;
+    }
       
-      public String[] getLocations() {
-        return locations;
-      }
+    public String getClassName() {
+      return splitClass;
+    }
       
-      public void readFields(DataInput in) throws IOException {
-        splitClass = Text.readString(in);
-        bytes.readFields(in);
-        int len = WritableUtils.readVInt(in);
-        locations = new String[len];
-        for(int i=0; i < len; ++i) {
-          locations[i] = Text.readString(in);
-        }
-      }
+    public BytesWritable getBytes() {
+      return bytes;
+    }
       
-      public void write(DataOutput out) throws IOException {
-        Text.writeString(out, splitClass);
-        bytes.write(out);
-        WritableUtils.writeVInt(out, locations.length);
-        for(int i = 0; i < locations.length; i++) {
-          Text.writeString(out, locations[i]);
-        }        
-      }
+    public void setLocations(String[] locations) {
+      this.locations = locations;
     }
-    
-    private static final int CURRENT_SPLIT_FILE_VERSION = 0;
-    private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-    
-    /** Create the list of input splits and write them out in a file for
-     *the JobTracker. The format is:
-     * <format version>
-     * <numSplits>
-     * for each split:
-     *    <RawSplit>
-     * @param splits the input splits to write out
-     * @param out the stream to write to
-     */
-    private void writeSplitsFile(InputSplit[] splits, FSDataOutputStream out) throws IOException {
-      out.write(SPLIT_FILE_HEADER);
-      WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
-      WritableUtils.writeVInt(out, splits.length);
-      DataOutputBuffer buffer = new DataOutputBuffer();
-      RawSplit rawSplit = new RawSplit();
-      for(InputSplit split: splits) {
-        rawSplit.setClassName(split.getClass().getName());
-        buffer.reset();
-        split.write(buffer);
-        rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-        rawSplit.setLocations(split.getLocations());
-        rawSplit.write(out);
-      }
+      
+    public String[] getLocations() {
+      return locations;
     }
-
-    /**
-     * Read a splits file into a list of raw splits
-     * @param in the stream to read from
-     * @return the complete list of splits
-     * @throws IOException
-     */
-    static RawSplit[] readSplitFile(DataInput in) throws IOException {
-      byte[] header = new byte[SPLIT_FILE_HEADER.length];
-      in.readFully(header);
-      if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
-        throw new IOException("Invalid header on split file");
-      }
-      int vers = WritableUtils.readVInt(in);
-      if (vers != CURRENT_SPLIT_FILE_VERSION) {
-        throw new IOException("Unsupported split version " + vers);
-      }
+      
+    public void readFields(DataInput in) throws IOException {
+      splitClass = Text.readString(in);
+      bytes.readFields(in);
       int len = WritableUtils.readVInt(in);
-      RawSplit[] result = new RawSplit[len];
+      locations = new String[len];
       for(int i=0; i < len; ++i) {
-        result[i] = new RawSplit();
-        result[i].readFields(in);
+        locations[i] = Text.readString(in);
       }
-      return result;
-    }
-    
-    /**
-     * Get an RunningJob object to track an ongoing job.  Returns
-     * null if the id does not correspond to any known job.
-     */
-    public RunningJob getJob(String jobid) throws IOException {
-        JobStatus status = jobSubmitClient.getJobStatus(jobid);
-        if (status != null) {
-            return new NetworkedJob(status);
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Get the information of the current state of the map tasks of a job.
-     * @param jobId the job to query
-     * @return the list of all of the map tips
-     */
-    public TaskReport[] getMapTaskReports(String jobId) throws IOException {
-      return jobSubmitClient.getMapTaskReports(jobId);
-    }
-    
-    /**
-     * Get the information of the current state of the reduce tasks of a job.
-     * @param jobId the job to query
-     * @return the list of all of the map tips
-     */    
-    public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
-      return jobSubmitClient.getReduceTaskReports(jobId);
     }
-    
-    public ClusterStatus getClusterStatus() throws IOException {
-      return jobSubmitClient.getClusterStatus();
-    }
-    
-    public JobStatus[] jobsToComplete() throws IOException {
-      return jobSubmitClient.jobsToComplete();
-    }
-    
-    /** Utility that submits a job, then polls for progress until the job is
-     * complete. */
-    public static void runJob(JobConf job) throws IOException {
-      JobClient jc = new JobClient(job);
-      boolean error = true;
-      RunningJob running = null;
-      String lastReport = null;
-      final int MAX_RETRIES = 5;
-      int retries = MAX_RETRIES;
-      TaskStatusFilter filter;
-      try {
-        filter = getTaskOutputFilter(job);
-      } catch(IllegalArgumentException e) {
-        LOG.warn("Invalid Output filter : " + e.getMessage() + 
-        " Valid values are : NONE, FAILED, SUCCEEDED, ALL");
-        throw e;
-      }
-      try {
-        running = jc.submitJob(job);
-        String jobId = running.getJobID();
-        LOG.info("Running job: " + jobId);
-        int eventCounter = 0 ; 
+      
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, splitClass);
+      bytes.write(out);
+      WritableUtils.writeVInt(out, locations.length);
+      for(int i = 0; i < locations.length; i++) {
+        Text.writeString(out, locations[i]);
+      }        
+    }
+  }
+    
+  private static final int CURRENT_SPLIT_FILE_VERSION = 0;
+  private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
+    
+  /** Create the list of input splits and write them out in a file for
+   *the JobTracker. The format is:
+   * <format version>
+   * <numSplits>
+   * for each split:
+   *    <RawSplit>
+   * @param splits the input splits to write out
+   * @param out the stream to write to
+   */
+  private void writeSplitsFile(InputSplit[] splits, FSDataOutputStream out) throws IOException {
+    out.write(SPLIT_FILE_HEADER);
+    WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
+    WritableUtils.writeVInt(out, splits.length);
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    RawSplit rawSplit = new RawSplit();
+    for(InputSplit split: splits) {
+      rawSplit.setClassName(split.getClass().getName());
+      buffer.reset();
+      split.write(buffer);
+      rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+      rawSplit.setLocations(split.getLocations());
+      rawSplit.write(out);
+    }
+  }
+
+  /**
+   * Read a splits file into a list of raw splits
+   * @param in the stream to read from
+   * @return the complete list of splits
+   * @throws IOException
+   */
+  static RawSplit[] readSplitFile(DataInput in) throws IOException {
+    byte[] header = new byte[SPLIT_FILE_HEADER.length];
+    in.readFully(header);
+    if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
+      throw new IOException("Invalid header on split file");
+    }
+    int vers = WritableUtils.readVInt(in);
+    if (vers != CURRENT_SPLIT_FILE_VERSION) {
+      throw new IOException("Unsupported split version " + vers);
+    }
+    int len = WritableUtils.readVInt(in);
+    RawSplit[] result = new RawSplit[len];
+    for(int i=0; i < len; ++i) {
+      result[i] = new RawSplit();
+      result[i].readFields(in);
+    }
+    return result;
+  }
+    
+  /**
+   * Get an RunningJob object to track an ongoing job.  Returns
+   * null if the id does not correspond to any known job.
+   */
+  public RunningJob getJob(String jobid) throws IOException {
+    JobStatus status = jobSubmitClient.getJobStatus(jobid);
+    if (status != null) {
+      return new NetworkedJob(status);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get the information of the current state of the map tasks of a job.
+   * @param jobId the job to query
+   * @return the list of all of the map tips
+   */
+  public TaskReport[] getMapTaskReports(String jobId) throws IOException {
+    return jobSubmitClient.getMapTaskReports(jobId);
+  }
+    
+  /**
+   * Get the information of the current state of the reduce tasks of a job.
+   * @param jobId the job to query
+   * @return the list of all of the map tips
+   */    
+  public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
+    return jobSubmitClient.getReduceTaskReports(jobId);
+  }
+    
+  public ClusterStatus getClusterStatus() throws IOException {
+    return jobSubmitClient.getClusterStatus();
+  }
+    
+  public JobStatus[] jobsToComplete() throws IOException {
+    return jobSubmitClient.jobsToComplete();
+  }
+    
+  /** Utility that submits a job, then polls for progress until the job is
+   * complete. */
+  public static void runJob(JobConf job) throws IOException {
+    JobClient jc = new JobClient(job);
+    boolean error = true;
+    RunningJob running = null;
+    String lastReport = null;
+    final int MAX_RETRIES = 5;
+    int retries = MAX_RETRIES;
+    TaskStatusFilter filter;
+    try {
+      filter = getTaskOutputFilter(job);
+    } catch(IllegalArgumentException e) {
+      LOG.warn("Invalid Output filter : " + e.getMessage() + 
+               " Valid values are : NONE, FAILED, SUCCEEDED, ALL");
+      throw e;
+    }
+    try {
+      running = jc.submitJob(job);
+      String jobId = running.getJobID();
+      LOG.info("Running job: " + jobId);
+      int eventCounter = 0 ; 
         
-        while (true) {
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException e) {}
-          try {
-            if (running.isComplete()) {
-              break;
-            }
-            running = jc.getJob(jobId);
-            String report = 
-              (" map " + StringUtils.formatPercent(running.mapProgress(), 0)+
-               " reduce " + 
-               StringUtils.formatPercent(running.reduceProgress(), 0));
-            if (!report.equals(lastReport)) {
-              LOG.info(report);
-              lastReport = report;
-            }
+      while (true) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {}
+        try {
+          if (running.isComplete()) {
+            break;
+          }
+          running = jc.getJob(jobId);
+          String report = 
+            (" map " + StringUtils.formatPercent(running.mapProgress(), 0)+
+             " reduce " + 
+             StringUtils.formatPercent(running.reduceProgress(), 0));
+          if (!report.equals(lastReport)) {
+            LOG.info(report);
+            lastReport = report;
+          }
             
-            if( filter  != TaskStatusFilter.NONE){
-              TaskCompletionEvent[] events = 
-                running.getTaskCompletionEvents(eventCounter); 
-              eventCounter += events.length ;
-              for(TaskCompletionEvent event : events ){
-                switch( filter ){
-                case SUCCEEDED:
-                  if( event.getTaskStatus() == 
+          if( filter  != TaskStatusFilter.NONE){
+            TaskCompletionEvent[] events = 
+              running.getTaskCompletionEvents(eventCounter); 
+            eventCounter += events.length ;
+            for(TaskCompletionEvent event : events ){
+              switch( filter ){
+              case SUCCEEDED:
+                if( event.getTaskStatus() == 
                     TaskCompletionEvent.Status.SUCCEEDED){
-                    LOG.info(event.toString());
-                    displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
-                  }
-                  break; 
-                case FAILED:
-                  if( event.getTaskStatus() == 
+                  LOG.info(event.toString());
+                  displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
+                }
+                break; 
+              case FAILED:
+                if( event.getTaskStatus() == 
                     TaskCompletionEvent.Status.FAILED){
-                    LOG.info(event.toString());
-                    displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
-                  }
-                  break ; 
-                case ALL:
                   LOG.info(event.toString());
                   displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
-                  break;
                 }
+                break ; 
+              case ALL:
+                LOG.info(event.toString());
+                displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
+                break;
               }
             }
-            retries = MAX_RETRIES;
-          } catch (IOException ie) {
-            if (--retries == 0) {
-              LOG.warn("Final attempt failed, killing job.");
-              throw ie;
-            }
-            LOG.info("Communication problem with server: " +
-                     StringUtils.stringifyException(ie));
           }
+          retries = MAX_RETRIES;
+        } catch (IOException ie) {
+          if (--retries == 0) {
+            LOG.warn("Final attempt failed, killing job.");
+            throw ie;
+          }
+          LOG.info("Communication problem with server: " +
+                   StringUtils.stringifyException(ie));
         }
-        if (!running.isSuccessful()) {
-          throw new IOException("Job failed!");
-        }
-        LOG.info("Job complete: " + jobId);
-        running.getCounters().log(LOG);
-        error = false;
-      } finally {
-        if (error && (running != null)) {
-          running.killJob();
-        }
-        jc.close();
       }
-      
+      if (!running.isSuccessful()) {
+        throw new IOException("Job failed!");
+      }
+      LOG.info("Job complete: " + jobId);
+      running.getCounters().log(LOG);
+      error = false;
+    } finally {
+      if (error && (running != null)) {
+        running.killJob();
+      }
+      jc.close();
     }
+      
+  }
 
-    private static void displayTaskLogs(String taskId, String baseUrl)
+  private static void displayTaskLogs(String taskId, String baseUrl)
     throws IOException {
-      // The tasktracker for a 'failed/killed' job might not be around...
-      if (baseUrl != null) {
-        // Copy tasks's stdout of the JobClient
-        getTaskLogs(taskId, new URL(baseUrl+"&filter=stdout"), System.out);
+    // The tasktracker for a 'failed/killed' job might not be around...
+    if (baseUrl != null) {
+      // Copy tasks's stdout of the JobClient
+      getTaskLogs(taskId, new URL(baseUrl+"&filter=stdout"), System.out);
         
-        // Copy task's stderr to stderr of the JobClient 
-        getTaskLogs(taskId, new URL(baseUrl+"&filter=stderr"), System.err);
-      }
+      // Copy task's stderr to stderr of the JobClient 
+      getTaskLogs(taskId, new URL(baseUrl+"&filter=stderr"), System.err);
     }
+  }
     
-    private static void getTaskLogs(String taskId, URL taskLogUrl, 
-            OutputStream out) {
+  private static void getTaskLogs(String taskId, URL taskLogUrl, 
+                                  OutputStream out) {
+    try {
+      URLConnection connection = taskLogUrl.openConnection();
+      BufferedReader input = 
+        new BufferedReader(new InputStreamReader(connection.getInputStream()));
+      BufferedWriter output = 
+        new BufferedWriter(new OutputStreamWriter(out));
       try {
-        URLConnection connection = taskLogUrl.openConnection();
-        BufferedReader input = 
-          new BufferedReader(new InputStreamReader(connection.getInputStream()));
-        BufferedWriter output = 
-          new BufferedWriter(new OutputStreamWriter(out));
-        try {
-          String logData = null;
-          while ((logData = input.readLine()) != null) {
-            if (logData.length() > 0) {
-              output.write(taskId + ": " + logData + "\n");
-              output.flush();
-            }
-          }
-        } finally {
-            input.close();
-        }
-      }catch(IOException ioe){
-        LOG.warn("Error reading task output" + ioe.getMessage()); 
-      }
-    }    
-
-    static Configuration getConfiguration(String jobTrackerSpec)
-    {
-      Configuration conf = new Configuration();
-      if(jobTrackerSpec != null) {        
-        if(jobTrackerSpec.indexOf(":") >= 0) {
-          conf.set("mapred.job.tracker", jobTrackerSpec);
-        } else {
-          String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
-          URL validate = conf.getResource(classpathFile);
-          if(validate == null) {
-            throw new RuntimeException(classpathFile + " not found on CLASSPATH");
+        String logData = null;
+        while ((logData = input.readLine()) != null) {
+          if (logData.length() > 0) {
+            output.write(taskId + ": " + logData + "\n");
+            output.flush();
           }
-          conf.addFinalResource(classpathFile);
         }
+      } finally {
+        input.close();
       }
-      return conf;
+    }catch(IOException ioe){
+      LOG.warn("Error reading task output" + ioe.getMessage()); 
     }
+  }    
 
-    /**
-     * Sets the output filter for tasks. only those tasks are printed whose
-     * output matches the filter. 
-     * @param newValue task filter.
-     */
-    @Deprecated
+  static Configuration getConfiguration(String jobTrackerSpec)
+  {
+    Configuration conf = new Configuration();
+    if(jobTrackerSpec != null) {        
+      if(jobTrackerSpec.indexOf(":") >= 0) {
+        conf.set("mapred.job.tracker", jobTrackerSpec);
+      } else {
+        String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
+        URL validate = conf.getResource(classpathFile);
+        if(validate == null) {
+          throw new RuntimeException(classpathFile + " not found on CLASSPATH");
+        }
+        conf.addFinalResource(classpathFile);
+      }
+    }
+    return conf;
+  }
+
+  /**
+   * Sets the output filter for tasks. only those tasks are printed whose
+   * output matches the filter. 
+   * @param newValue task filter.
+   */
+  @Deprecated
     public void setTaskOutputFilter(TaskStatusFilter newValue){
-      this.taskOutputFilter = newValue ;
-    }
+    this.taskOutputFilter = newValue ;
+  }
     
-    /**
-     * Get the task output filter out of the JobConf
-     * @param job the JobConf to examine
-     * @return the filter level
-     */
-    public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
-      return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
-                                              "FAILED"));
-    }
-    
-    /**
-     * Modify the JobConf to set the task output filter
-     * @param job the JobConf to modify
-     * @param newValue the value to set
-     */
-    public static void setTaskOutputFilter(JobConf job, 
-                                           TaskStatusFilter newValue) {
-      job.set("jobclient.output.filter", newValue.toString());
-    }
-    
-    /**
-     * Returns task output filter.
-     * @return task filter. 
-     */
-    @Deprecated
+  /**
+   * Get the task output filter out of the JobConf
+   * @param job the JobConf to examine
+   * @return the filter level
+   */
+  public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
+    return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
+                                            "FAILED"));
+  }
+    
+  /**
+   * Modify the JobConf to set the task output filter
+   * @param job the JobConf to modify
+   * @param newValue the value to set
+   */
+  public static void setTaskOutputFilter(JobConf job, 
+                                         TaskStatusFilter newValue) {
+    job.set("jobclient.output.filter", newValue.toString());
+  }
+    
+  /**
+   * Returns task output filter.
+   * @return task filter. 
+   */
+  @Deprecated
     public TaskStatusFilter getTaskOutputFilter(){
-      return this.taskOutputFilter; 
-    }
+    return this.taskOutputFilter; 
+  }
     
-    public int run(String[] argv) throws Exception {
-        if (argv.length < 2) {
-            System.out.println("JobClient -submit <job> | -status <id> |" + 
-                               " -events <id> |" +
-                               " -kill <id> [-jt <jobtracker:port>|<config>]");
-            System.exit(-1);
-        }
+  public int run(String[] argv) throws Exception {
+    if (argv.length < 2) {
+      System.out.println("JobClient -submit <job> | -status <id> |" + 
+                         " -events <id> |" +
+                         " -kill <id> [-jt <jobtracker:port>|<config>]");
+      System.exit(-1);
+    }
 
-        // initialize JobClient
-        init();
+    // initialize JobClient
+    init();
         
-        // Process args
-        String submitJobFile = null;
-        String jobid = null;
-        boolean getStatus = false;
-        boolean killJob = false;
-
-        for (int i = 0; i < argv.length; i++) {
-            if ("-submit".equals(argv[i])) {
-                submitJobFile = argv[i+1];
-                i++;
-            } else if ("-status".equals(argv[i])) {
-                jobid = argv[i+1];
-                getStatus = true;
-                i++;
-            } else if ("-kill".equals(argv[i])) {
-                jobid = argv[i+1];
-                killJob = true;
-                i++;
-            } else if ("-events".equals(argv[i])) {
-              listEvents(argv[i+1], Integer.parseInt(argv[i+2]), 
-                         Integer.parseInt(argv[i+3]));
-              i += 3;
-            }
-        }
-
-        // Submit the request
-        int exitCode = -1;
-        try {
-            if (submitJobFile != null) {
-                RunningJob job = submitJob(submitJobFile);
-                System.out.println("Created job " + job.getJobID());
-            } else if (getStatus) {
-                RunningJob job = getJob(jobid);
-                if (job == null) {
-                    System.out.println("Could not find job " + jobid);
-                } else {
-                    System.out.println();
-                    System.out.println(job);
-                    exitCode = 0;
-                }
-            } else if (killJob) {
-                RunningJob job = getJob(jobid);
-                if (job == null) {
-                    System.out.println("Could not find job " + jobid);
-                } else {
-                    job.killJob();
-                    System.out.println("Killed job " + jobid);
-                    exitCode = 0;
-                }
-            }
-        } finally {
-            close();
-        }
-        return exitCode;
-    }
-    
-    /**
-     * List the events for the given job
-     * @param jobId the job id for the job's events to list
-     * @throws IOException
-     */
-    private void listEvents(String jobId, int fromEventId, int numEvents)
-    throws IOException {
-      TaskCompletionEvent[] events = 
-        jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents);
-      System.out.println("Task completion events for " + jobId);
-      System.out.println("Number of events (from " + fromEventId + 
-          ") are: " + events.length);
-      for(TaskCompletionEvent event: events) {
-        System.out.println(event.getTaskStatus() + " " + event.getTaskId() + 
-                           " " + event.getTaskTrackerHttp());
+    // Process args
+    String submitJobFile = null;
+    String jobid = null;
+    boolean getStatus = false;
+    boolean killJob = false;
+
+    for (int i = 0; i < argv.length; i++) {
+      if ("-submit".equals(argv[i])) {
+        submitJobFile = argv[i+1];
+        i++;
+      } else if ("-status".equals(argv[i])) {
+        jobid = argv[i+1];
+        getStatus = true;
+        i++;
+      } else if ("-kill".equals(argv[i])) {
+        jobid = argv[i+1];
+        killJob = true;
+        i++;
+      } else if ("-events".equals(argv[i])) {
+        listEvents(argv[i+1], Integer.parseInt(argv[i+2]), 
+                   Integer.parseInt(argv[i+3]));
+        i += 3;
       }
     }
-    
-    /**
-     */
-    public static void main(String argv[]) throws Exception {
-        int res = new JobClient().doMain(new Configuration(), argv);
-        System.exit(res);
-    }
+
+    // Submit the request
+    int exitCode = -1;
+    try {
+      if (submitJobFile != null) {
+        RunningJob job = submitJob(submitJobFile);
+        System.out.println("Created job " + job.getJobID());
+      } else if (getStatus) {
+        RunningJob job = getJob(jobid);
+        if (job == null) {
+          System.out.println("Could not find job " + jobid);
+        } else {
+          System.out.println();
+          System.out.println(job);
+          exitCode = 0;
+        }
+      } else if (killJob) {
+        RunningJob job = getJob(jobid);
+        if (job == null) {
+          System.out.println("Could not find job " + jobid);
+        } else {
+          job.killJob();
+          System.out.println("Killed job " + jobid);
+          exitCode = 0;
+        }
+      }
+    } finally {
+      close();
+    }
+    return exitCode;
+  }
+    
+  /**
+   * List the events for the given job
+   * @param jobId the job id for the job's events to list
+   * @throws IOException
+   */
+  private void listEvents(String jobId, int fromEventId, int numEvents)
+    throws IOException {
+    TaskCompletionEvent[] events = 
+      jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents);
+    System.out.println("Task completion events for " + jobId);
+    System.out.println("Number of events (from " + fromEventId + 
+                       ") are: " + events.length);
+    for(TaskCompletionEvent event: events) {
+      System.out.println(event.getTaskStatus() + " " + event.getTaskId() + 
+                         " " + event.getTaskTrackerHttp());
+    }
+  }
+    
+  /**
+   */
+  public static void main(String argv[]) throws Exception {
+    int res = new JobClient().doMain(new Configuration(), argv);
+    System.exit(res);
+  }
 }
 



Mime
View raw message