hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1495297 [45/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Date Fri, 21 Jun 2013 06:37:39 GMT
Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicInputFormat.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicInputFormat.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicInputFormat.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+
+/**
+ * DynamicInputFormat implements the "Worker pattern" for DistCp.
+ * Rather than to split up the copy-list into a set of static splits,
+ * the DynamicInputFormat does the following:
+ * 1. Splits the copy-list into small chunks on the DFS.
+ * 2. Creates a set of empty "dynamic" splits, that each consume as many chunks
+ *    as it can.
+ * This arrangement ensures that a single slow mapper won't slow down the entire
+ * job (since the slack will be picked up by other mappers, who consume more
+ * chunks.)
+ * By varying the split-ratio, one can vary chunk sizes to achieve different
+ * performance characteristics. 
+ */
+public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
+  private static final Log LOG = LogFactory.getLog(DynamicInputFormat.class);
+
+  private static final String CONF_LABEL_LISTING_SPLIT_RATIO
+          = "mapred.listing.split.ratio";
+  private static final String CONF_LABEL_NUM_SPLITS
+          = "mapred.num.splits";
+  private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
+          = "mapred.num.entries.per.chunk";
+
+  /**
+   * Implementation of InputFormat::getSplits(). This method splits up the
+   * copy-listing file into chunks, and assigns the first batch to different
+   * tasks.
+   * @param jobContext JobContext for the map job.
+   * @return The list of (empty) dynamic input-splits.
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext)
+      throws IOException, InterruptedException {
+    LOG.info("DynamicInputFormat: Getting splits for job:"
+             + jobContext.getJobID());
+    return createSplits(jobContext,
+                        splitCopyListingIntoChunksWithShuffle(jobContext));
+  }
+
+  private List<InputSplit> createSplits(JobContext jobContext,
+                                        List<DynamicInputChunk> chunks)
+          throws IOException {
+    int numMaps = getNumMapTasks(jobContext.getConfiguration());
+
+    final int nSplits = Math.min(numMaps, chunks.size());
+    List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
+    
+    for (int i=0; i< nSplits; ++i) {
+      TaskID taskId = new TaskID(jobContext.getJobID(), true, i);
+      chunks.get(i).assignTo(taskId);
+      splits.add(new FileSplit(chunks.get(i).getPath(), 0,
+          // Setting non-zero length for FileSplit size, to avoid a possible
+          // future when 0-sized file-splits are considered "empty" and skipped
+          // over.
+          MIN_RECORDS_PER_CHUNK,
+          null));
+    }
+    DistCpUtils.publish(jobContext.getConfiguration(),
+                        CONF_LABEL_NUM_SPLITS, splits.size());
+    return splits;
+  }
+
+  private static int N_CHUNKS_OPEN_AT_ONCE_DEFAULT = 16;
+
+  private List<DynamicInputChunk> splitCopyListingIntoChunksWithShuffle
+                                    (JobContext context) throws IOException {
+
+    final Configuration configuration = context.getConfiguration();
+    int numRecords = getNumberOfRecords(configuration);
+    int numMaps = getNumMapTasks(configuration);
+    // Number of chunks each map will process, on average.
+    int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords);
+    validateNumChunksUsing(splitRatio, numMaps);
+
+    int numEntriesPerChunk = (int)Math.ceil((float)numRecords
+                                          /(splitRatio * numMaps));
+    DistCpUtils.publish(context.getConfiguration(),
+                        CONF_LABEL_NUM_ENTRIES_PER_CHUNK,
+                        numEntriesPerChunk);
+
+    final int nChunksTotal = (int)Math.ceil((float)numRecords/numEntriesPerChunk);
+    int nChunksOpenAtOnce
+            = Math.min(N_CHUNKS_OPEN_AT_ONCE_DEFAULT, nChunksTotal);
+
+    Path listingPath = getListingFilePath(configuration);
+    SequenceFile.Reader reader = new SequenceFile.Reader(
+        listingPath.getFileSystem(configuration), listingPath, configuration);
+
+    List<DynamicInputChunk> openChunks
+                  = new ArrayList<DynamicInputChunk>();
+    
+    List<DynamicInputChunk> chunksFinal = new ArrayList<DynamicInputChunk>();
+
+    FileStatus fileStatus = new FileStatus();
+    Text relPath = new Text();
+    int recordCounter = 0;
+    int chunkCount = 0;
+
+    try {
+
+      while (reader.next(relPath, fileStatus)) {
+        if (recordCounter % (nChunksOpenAtOnce*numEntriesPerChunk) == 0) {
+          // All chunks full. Create new chunk-set.
+          closeAll(openChunks);
+          chunksFinal.addAll(openChunks);
+
+          openChunks = createChunks(
+                  configuration, chunkCount, nChunksTotal, nChunksOpenAtOnce);
+
+          chunkCount += openChunks.size();
+
+          nChunksOpenAtOnce = openChunks.size();
+          recordCounter = 0;
+        }
+
+        // Shuffle into open chunks.
+        openChunks.get(recordCounter%nChunksOpenAtOnce).write(relPath, fileStatus);
+        ++recordCounter;
+      }
+
+    } finally {
+      closeAll(openChunks);
+      chunksFinal.addAll(openChunks);
+      IOUtils.closeStream(reader);
+    }
+
+    LOG.info("Number of dynamic-chunk-files created: " + chunksFinal.size()); 
+    return chunksFinal;
+  }
+
+  private static void validateNumChunksUsing(int splitRatio, int numMaps)
+                                              throws IOException {
+    if (splitRatio * numMaps > MAX_CHUNKS_TOLERABLE)
+      throw new IOException("Too many chunks created with splitRatio:"
+                 + splitRatio + ", numMaps:" + numMaps
+                 + ". Reduce numMaps or decrease split-ratio to proceed.");
+  }
+
+  private static void closeAll(List<DynamicInputChunk> chunks) {
+    for (DynamicInputChunk chunk: chunks)
+      chunk.close();
+  }
+
+  private static List<DynamicInputChunk> createChunks(Configuration config,
+                      int chunkCount, int nChunksTotal, int nChunksOpenAtOnce)
+                                          throws IOException {
+    List<DynamicInputChunk> chunks = new ArrayList<DynamicInputChunk>();
+    int chunkIdUpperBound
+            = Math.min(nChunksTotal, chunkCount + nChunksOpenAtOnce);
+
+    // If there will be fewer than nChunksOpenAtOnce chunks left after
+    // the current batch of chunks, fold the remaining chunks into
+    // the current batch.
+    if (nChunksTotal - chunkIdUpperBound < nChunksOpenAtOnce)
+      chunkIdUpperBound = nChunksTotal;
+
+    for (int i=chunkCount; i < chunkIdUpperBound; ++i)
+      chunks.add(createChunk(i, config));
+    return chunks;
+  }
+
+  private static DynamicInputChunk createChunk(int chunkId, Configuration config)
+                                              throws IOException {
+    return DynamicInputChunk.createChunkForWrite(String.format("%05d", chunkId),
+                                              config);
+  }
+
+
+  private static Path getListingFilePath(Configuration configuration) {
+    String listingFilePathString = configuration.get(
+            DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
+
+    assert !listingFilePathString.equals("") : "Listing file not found.";
+
+    Path listingFilePath = new Path(listingFilePathString);
+    try {
+      assert listingFilePath.getFileSystem(configuration)
+              .exists(listingFilePath) : "Listing file: " + listingFilePath +
+                                          " not found.";
+    } catch (IOException e) {
+      assert false :   "Listing file: " + listingFilePath
+                    + " couldn't be accessed. " + e.getMessage();
+    }
+    return listingFilePath;
+  }
+
+  private static int getNumberOfRecords(Configuration configuration) {
+    return DistCpUtils.getInt(configuration,
+                              DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS);
+  }
+
+  private static int getNumMapTasks(Configuration configuration) {
+    return DistCpUtils.getNumMapTasks(configuration);
+  }
+
+  private static int getListingSplitRatio(Configuration configuration,
+                                            int numMaps, int numPaths) {
+    return configuration.getInt(
+            CONF_LABEL_LISTING_SPLIT_RATIO,
+            getSplitRatio(numMaps, numPaths));
+  }
+
+  private static final int MAX_CHUNKS_TOLERABLE = 400;
+  private static final int MAX_CHUNKS_IDEAL     = 100;
+  private static final int MIN_RECORDS_PER_CHUNK = 5;
+  private static final int SPLIT_RATIO_DEFAULT  = 2;
+
+  /**
+   * Package private, for testability.
+   * @param nMaps The number of maps requested for.
+   * @param nRecords The number of records to be copied.
+   * @return The number of splits each map should handle, ideally.
+   */
+  static int getSplitRatio(int nMaps, int nRecords) {
+    if (nMaps == 1) {
+      LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
+      return 1;
+    }
+
+    if (nMaps > MAX_CHUNKS_IDEAL)
+      return SPLIT_RATIO_DEFAULT;
+
+    int nPickups = (int)Math.ceil((float)MAX_CHUNKS_IDEAL/nMaps);
+    int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));
+
+    return nRecordsPerChunk < MIN_RECORDS_PER_CHUNK ?
+              SPLIT_RATIO_DEFAULT : nPickups;
+  }
+
+  static int getNumEntriesPerChunk(Configuration configuration) {
+    return DistCpUtils.getInt(configuration,
+                              CONF_LABEL_NUM_ENTRIES_PER_CHUNK);
+  }
+
+
+  /**
+   * Implementation of Inputformat::createRecordReader().
+   * @param inputSplit The split for which the RecordReader is required.
+   * @param taskAttemptContext TaskAttemptContext for the current attempt.
+   * @return DynamicRecordReader instance.
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public RecordReader<K, V> createRecordReader(
+          InputSplit inputSplit,
+          TaskAttemptContext taskAttemptContext)
+          throws IOException, InterruptedException {
+    return new DynamicRecordReader<K, V>();
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicRecordReader.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicRecordReader.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicRecordReader.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The DynamicRecordReader is used in conjunction with the DynamicInputFormat
+ * to implement the "Worker pattern" for DistCp.
+ * The DynamicRecordReader is responsible for:
+ * 1. Presenting the contents of each chunk to DistCp's mapper.
+ * 2. Acquiring a new chunk when the current chunk has been completely consumed,
+ *    transparently.
+ */
+public class DynamicRecordReader<K, V> extends RecordReader<K, V> {
+  private static final Log LOG = LogFactory.getLog(DynamicRecordReader.class);
+  private TaskAttemptContext taskAttemptContext;
+  private Configuration configuration;
+  private DynamicInputChunk<K, V> chunk;
+  private TaskID taskId;
+
+  // Data required for progress indication.
+  private int numRecordsPerChunk; // Constant per job.
+  private int totalNumRecords;    // Constant per job.
+  private int numRecordsProcessedByThisMap = 0;
+  private long timeOfLastChunkDirScan = 0;
+  private boolean isChunkDirAlreadyScanned = false;
+
+  private static long TIME_THRESHOLD_FOR_DIR_SCANS = TimeUnit.MINUTES.toMillis(5);
+
+  /**
+   * Implementation for RecordReader::initialize(). Initializes the internal
+   * RecordReader to read from chunks.
+   * @param inputSplit The InputSplit for the map. Ignored entirely.
+   * @param taskAttemptContext The AttemptContext.
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public void initialize(InputSplit inputSplit,
+                         TaskAttemptContext taskAttemptContext)
+                         throws IOException, InterruptedException {
+    numRecordsPerChunk = DynamicInputFormat.getNumEntriesPerChunk(
+            taskAttemptContext.getConfiguration());
+    this.taskAttemptContext = taskAttemptContext;
+    configuration = taskAttemptContext.getConfiguration();
+    taskId = taskAttemptContext.getTaskAttemptID().getTaskID();
+    chunk = DynamicInputChunk.acquire(this.taskAttemptContext);
+    timeOfLastChunkDirScan = System.currentTimeMillis();
+    isChunkDirAlreadyScanned = false;
+
+    totalNumRecords = getTotalNumRecords();
+
+  }
+
+  private int getTotalNumRecords() {
+    return DistCpUtils.getInt(configuration,
+                              DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS);
+  }
+
+  /**
+   * Implementation of RecordReader::nextValue().
+   * Reads the contents of the current chunk and returns them. When a chunk has
+   * been completely exhausted, an new chunk is acquired and read,
+   * transparently.
+   * @return True, if the nextValue() could be traversed to. False, otherwise.
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public boolean nextKeyValue()
+      throws IOException, InterruptedException {
+
+    if (chunk == null) {
+      if (LOG.isDebugEnabled())
+        LOG.debug(taskId + ": RecordReader is null. No records to be read.");
+      return false;
+    }
+
+    if (chunk.getReader().nextKeyValue()) {
+      ++numRecordsProcessedByThisMap;
+      return true;
+    }
+
+    if (LOG.isDebugEnabled())
+      LOG.debug(taskId + ": Current chunk exhausted. " +
+                         " Attempting to pick up new one.");
+
+    chunk.release();
+    timeOfLastChunkDirScan = System.currentTimeMillis();
+    isChunkDirAlreadyScanned = false;
+    
+    chunk = DynamicInputChunk.acquire(taskAttemptContext);
+
+    if (chunk == null) return false;
+
+    if (chunk.getReader().nextKeyValue()) {
+      ++numRecordsProcessedByThisMap;
+      return true;
+    }
+    else {
+      return false;
+    }
+  }
+
+  /**
+   * Implementation of RecordReader::getCurrentKey().
+   * @return The key of the current record. (i.e. the source-path.)
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public K getCurrentKey()
+      throws IOException, InterruptedException {
+    return chunk.getReader().getCurrentKey();
+  }
+
+  /**
+   * Implementation of RecordReader::getCurrentValue().
+   * @return The value of the current record. (i.e. the target-path.)
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public V getCurrentValue()
+      throws IOException, InterruptedException {
+    return chunk.getReader().getCurrentValue();
+  }
+
+  /**
+   * Implementation of RecordReader::getProgress().
+   * @return A fraction [0.0,1.0] indicating the progress of a DistCp mapper.
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public float getProgress()
+      throws IOException, InterruptedException {
+    final int numChunksLeft = getNumChunksLeft();
+    if (numChunksLeft < 0) {// Un-initialized. i.e. Before 1st dir-scan.
+      assert numRecordsProcessedByThisMap <= numRecordsPerChunk
+              : "numRecordsProcessedByThisMap:" + numRecordsProcessedByThisMap +
+                " exceeds numRecordsPerChunk:" + numRecordsPerChunk;
+      return ((float) numRecordsProcessedByThisMap) / totalNumRecords;
+      // Conservative estimate, till the first directory scan.
+    }
+
+    return ((float) numRecordsProcessedByThisMap)
+            /(numRecordsProcessedByThisMap + numRecordsPerChunk*numChunksLeft);
+  }
+
+  private int getNumChunksLeft() throws IOException {
+    long now = System.currentTimeMillis();
+    boolean tooLongSinceLastDirScan
+                  = now - timeOfLastChunkDirScan > TIME_THRESHOLD_FOR_DIR_SCANS;
+
+    if (tooLongSinceLastDirScan
+            || (!isChunkDirAlreadyScanned &&
+                    numRecordsProcessedByThisMap%numRecordsPerChunk
+                              > numRecordsPerChunk/2)) {
+      DynamicInputChunk.getListOfChunkFiles();
+      isChunkDirAlreadyScanned = true;
+      timeOfLastChunkDirScan = now;
+    }
+
+    return DynamicInputChunk.getNumChunksLeft();
+  }
+  /**
+   * Implementation of RecordReader::close().
+   * Closes the RecordReader.
+   * @throws IOException, on failure.
+   */
+  @Override
+  public void close()
+      throws IOException {
+    if (chunk != null)
+        chunk.close();
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/DistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/DistCpUtils.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/DistCpUtils.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/DistCpUtils.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.distcp2.mapred.UniformSizeInputFormat;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.mapreduce.InputFormat;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Locale;
+import java.text.DecimalFormat;
+import java.net.URI;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Utility functions used in DistCp.
+ */
+public class DistCpUtils {
+
+  private static final Log LOG = LogFactory.getLog(DistCpUtils.class);
+
+  /**
+   * Retrieves size of the file at the specified path.
+   * @param path The path of the file whose size is sought.
+   * @param configuration Configuration, to retrieve the appropriate FileSystem.
+   * @return The file-size, in number of bytes.
+   * @throws IOException, on failure.
+   */
+  public static long getFileSize(Path path, Configuration configuration)
+                                            throws IOException {
+    if (LOG.isDebugEnabled())
+      LOG.debug("Retrieving file size for: " + path);
+    return path.getFileSystem(configuration).getFileStatus(path).getLen();
+  }
+
+  /**
+   * Utility to publish a value to a configuration.
+   * @param configuration The Configuration to which the value must be written.
+   * @param label The label for the value being published.
+   * @param value The value being published.
+   * @param <T> The type of the value.
+   */
+  public static <T> void publish(Configuration configuration,
+                                 String label, T value) {
+    configuration.set(label, String.valueOf(value));
+  }
+
+  /**
+   * Utility to retrieve a specified key from a Configuration. Throw exception
+   * if not found.
+   * @param configuration The Configuration in which the key is sought.
+   * @param label The key being sought.
+   * @return Integer value of the key.
+   */
+  public static int getInt(Configuration configuration, String label) {
+    int value = configuration.getInt(label, -1);
+    assert value >= 0 : "Couldn't find " + label;
+    return value;
+  }
+
+  public static int getNumMapTasks(Configuration configuration) {
+    return getInt(configuration, "mapred.map.tasks");
+  }
+
+  /**
+   * Utility to retrieve a specified key from a Configuration. Throw exception
+   * if not found.
+   * @param configuration The Configuration in which the key is sought.
+   * @param label The key being sought.
+   * @return Long value of the key.
+   */
+  public static long getLong(Configuration configuration, String label) {
+    long value = configuration.getLong(label, -1);
+    assert value >= 0 : "Couldn't find " + label;
+    return value;
+  }
+
+  /**
+   * Returns the class that implements a copy strategy. Looks up the implementation for
+   * a particular strategy from distcp-default.xml
+   *
+   * @param conf - Configuration object
+   * @param options - Handle to input options
+   * @return Class implementing the strategy specified in options.
+   */
+  public static Class<? extends InputFormat> getStrategy(Configuration conf,
+                                                                 DistCpOptions options) {
+    String confLabel = "distcp." +
+        options.getCopyStrategy().toLowerCase(Locale.getDefault()) + ".strategy.impl";
+    return conf.getClass(confLabel, UniformSizeInputFormat.class, InputFormat.class);
+  }
+
+  /**
+   * Gets relative path of child path with respect to a root path
+   * For ex. If childPath = /tmp/abc/xyz/file and
+   *            sourceRootPath = /tmp/abc
+   * Relative path would be /xyz/file
+   *         If childPath = /file and
+   *            sourceRootPath = /
+   * Relative path would be /file
+   * @param sourceRootPath - Source root path
+   * @param childPath - Path for which relative path is required
+   * @return - Relative portion of the child path (always prefixed with /
+   *           unless it is empty 
+   */
+  public static String getRelativePath(Path sourceRootPath, Path childPath) {
+    String childPathString = childPath.toUri().getPath();
+    String sourceRootPathString = sourceRootPath.toUri().getPath();
+    return sourceRootPathString.equals("/") ? childPathString :
+        childPathString.substring(sourceRootPathString.length());
+  }
+
+  /**
+   * Pack file preservation attributes into a string, containing
+   * just the first character of each preservation attribute
+   * @param attributes - Attribute set to preserve
+   * @return - String containing first letters of each attribute to preserve
+   */
+  public static String packAttributes(EnumSet<FileAttribute> attributes) {
+    StringBuffer buffer = new StringBuffer(5);
+    int len = 0;
+    for (FileAttribute attribute : attributes) {
+      buffer.append(attribute.name().charAt(0));
+      len++;
+    }
+    return buffer.substring(0, len);
+  }
+
+  /**
+   * Un packs preservation attribute string containing the first character of
+   * each preservation attribute back to a set of attributes to preserve
+   * @param attributes - Attribute string
+   * @return - Attribute set
+   */
+  public static EnumSet<FileAttribute> unpackAttributes(String attributes) {
+    EnumSet<FileAttribute> retValue = EnumSet.noneOf(FileAttribute.class);
+
+    if (attributes != null) {
+      for (int index = 0; index < attributes.length(); index++) {
+        retValue.add(FileAttribute.getAttribute(attributes.charAt(index)));
+      }
+    }
+
+    return retValue;
+  }
+
+  /**
+   * Preserve attribute on file matching that of the file status being sent
+   * as argument. Barring the block size, all the other attributes are preserved
+   * by this function
+   *
+   * @param targetFS - File system
+   * @param path - Path that needs to preserve original file status
+   * @param srcFileStatus - Original file status
+   * @param attributes - Attribute set that need to be preserved
+   * @throws IOException - Exception if any (particularly relating to group/owner
+   *                       change or any transient error)
+   */
+  public static void preserve(FileSystem targetFS, Path path,
+                              FileStatus srcFileStatus,
+                              EnumSet<FileAttribute> attributes) throws IOException {
+
+    FileStatus targetFileStatus = targetFS.getFileStatus(path);
+    String group = targetFileStatus.getGroup();
+    String user = targetFileStatus.getOwner();
+    boolean chown = false;
+
+    if (attributes.contains(FileAttribute.PERMISSION) &&
+      !srcFileStatus.getPermission().equals(targetFileStatus.getPermission())) {
+      targetFS.setPermission(path, srcFileStatus.getPermission());
+    }
+
+    if (attributes.contains(FileAttribute.REPLICATION) && ! targetFileStatus.isDir() &&
+        srcFileStatus.getReplication() != targetFileStatus.getReplication()) {
+      targetFS.setReplication(path, srcFileStatus.getReplication());
+    }
+
+    if (attributes.contains(FileAttribute.GROUP) &&
+            !group.equals(srcFileStatus.getGroup())) {
+      group = srcFileStatus.getGroup();
+      chown = true;
+    }
+
+    if (attributes.contains(FileAttribute.USER) &&
+            !user.equals(srcFileStatus.getOwner())) {
+      user = srcFileStatus.getOwner();
+      chown = true;
+    }
+
+    if (chown) {
+      targetFS.setOwner(path, user, group);
+    }
+  }
+
+  /**
+   * Sort sequence file containing FileStatus and Text as key and value respecitvely
+   *
+   * @param fs - File System
+   * @param conf - Configuration
+   * @param sourceListing - Source listing file
+   * @return Path of the sorted file. Is source file with _sorted appended to the name
+   * @throws IOException - Any exception during sort.
+   */
+  public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing)
+      throws IOException {
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, FileStatus.class, conf);
+    Path output = new Path(sourceListing.toString() +  "_sorted");
+
+    if (fs.exists(output)) {
+      fs.delete(output, false);
+    }
+
+    sorter.sort(sourceListing, output);
+    return output;
+  }
+
+  /**
+   * String utility to convert a number-of-bytes to human readable format.
+   */
+  private static ThreadLocal<DecimalFormat> FORMATTER
+                        = new ThreadLocal<DecimalFormat>() {
+    @Override
+    protected DecimalFormat initialValue() {
+      return new DecimalFormat("0.0");
+    }
+  };
+
+  public static DecimalFormat getFormatter() {
+    return FORMATTER.get();
+  }
+
+  public static String getStringDescriptionFor(long nBytes) {
+
+    char units [] = {'B', 'K', 'M', 'G', 'T', 'P'};
+
+    double current = nBytes;
+    double prev    = current;
+    int index = 0;
+
+    while ((current = current/1024) >= 1) {
+      prev = current;
+      ++index;
+    }
+
+    assert index < units.length : "Too large a number.";
+
+    return getFormatter().format(prev) + units[index];
+  }
+
+  /**
+   * Utility to compare checksums for the paths specified.
+   *
+   * If checksums's can't be retrieved, it doesn't fail the test
+   * Only time the comparison would fail is when checksums are
+   * available and they don't match
+   *                                  
+   * @param sourceFS FileSystem for the source path.
+   * @param source The source path.
+   * @param targetFS FileSystem for the target path.
+   * @param target The target path.
+   * @return If either checksum couldn't be retrieved, the function returns
+   * false. If checksums are retrieved, the function returns true if they match,
+   * and false otherwise.
+   * @throws IOException if there's an exception while retrieving checksums.
+   */
+  public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
+                                   FileSystem targetFS, Path target)
+                                   throws IOException {
+    FileChecksum sourceChecksum = null;
+    FileChecksum targetChecksum = null;
+    try {
+      sourceChecksum = sourceFS.getFileChecksum(source);
+      targetChecksum = targetFS.getFileChecksum(target);
+    } catch (IOException e) {
+      LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
+    }
+    return (sourceChecksum == null || targetChecksum == null ||
+            sourceChecksum.equals(targetChecksum));
+  }
+
+  /* see if two file systems are the same or not
+   *
+   */
+  public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
+    URI srcUri = srcFs.getUri();
+    URI dstUri = destFs.getUri();
+    if (srcUri.getScheme() == null) {
+      return false;
+    }
+    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+      return false;
+    }
+    String srcHost = srcUri.getHost();
+    String dstHost = dstUri.getHost();
+    if ((srcHost != null) && (dstHost != null)) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+      } catch(UnknownHostException ue) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Could not compare file-systems. Unknown host: ", ue);
+        return false;
+      }
+      if (!srcHost.equals(dstHost)) {
+        return false;
+      }
+    }
+    else if (srcHost == null && dstHost != null) {
+      return false;
+    }
+    else if (srcHost != null) {
+      return false;
+    }
+
+    //check for ports
+
+    return srcUri.getPort() == dstUri.getPort();
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/RetriableCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/RetriableCommand.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/RetriableCommand.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/RetriableCommand.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.tools.distcp2.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents commands that be retried on failure, in a configurable
+ * manner.
+ */
+public abstract class RetriableCommand {
+
+  private static Log LOG = LogFactory.getLog(RetriableCommand.class);
+
+  private static final long DELAY_MILLISECONDS = 500;
+  private static final int  MAX_RETRIES        = 3;
+
+  private RetryPolicy retryPolicy = RetryPolicies.
+      exponentialBackoffRetry(MAX_RETRIES, DELAY_MILLISECONDS, TimeUnit.MILLISECONDS);
+  protected String description;
+
+  /**
+   * Constructor.
+   * @param description The human-readable description of the command.
+   */
+  public RetriableCommand(String description) {
+    this.description = description;
+  }
+
+  /**
+   * Constructor.
+   * @param description The human-readable description of the command.
+   * @param retryPolicy The RetryHandler to be used to compute retries.
+   */
+  public RetriableCommand(String description, RetryPolicy retryPolicy) {
+    this(description);
+    setRetryPolicy(retryPolicy);
+  }
+
+  /**
+   * Implement this interface-method define the command-logic that will be
+   * retried on failure (i.e. with Exception).
+   * @param arguments Argument-list to the command.
+   * @return Generic "Object".
+   * @throws Exception Throws Exception on complete failure.
+   */
+  protected abstract Object doExecute(Object... arguments) throws Exception;
+
+  /**
+   * The execute() method invokes doExecute() until either:
+   *  1. doExecute() succeeds, or
+   *  2. the command may no longer be retried (e.g. runs out of retry-attempts).
+   * @param arguments The list of arguments for the command.
+   * @return Generic "Object" from doExecute(), on success.
+   * @throws IOException, IOException, on complete failure.
+   */
+  public Object execute(Object... arguments) throws Exception {
+    Exception latestException;
+    int counter = 0;
+    do {
+      try {
+        return doExecute(arguments);
+      } catch(Exception exception) {
+        LOG.error("Failure in Retriable command: " + description, exception);
+        latestException = exception;
+      }
+      counter++;
+    } while (retryPolicy.shouldRetry(latestException, counter));
+
+    throw new IOException("Couldn't run retriable-command: " + description,
+                          latestException);
+  }
+
+  /**
+   * Fluent-interface to change the RetryHandler.
+   * @param retryHandler The new RetryHandler instance to be used.
+   * @return Self.
+   */
+  public RetriableCommand setRetryPolicy(RetryPolicy retryHandler) {
+    this.retryPolicy = retryHandler;
+    return this;
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/ThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/ThrottledInputStream.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/ThrottledInputStream.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/util/ThrottledInputStream.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * The ThrottleInputStream provides bandwidth throttling on a specified
+ * InputStream. It is implemented as a wrapper on top of another InputStream
+ * instance.
+ * The throttling works by examining the number of bytes read from the underlying
+ * InputStream from the beginning, and sleep()ing for a time interval if
+ * the byte-transfer is found exceed the specified tolerable maximum.
+ * (Thus, while the read-rate might exceed the maximum for a given short interval,
+ * the average tends towards the specified maximum, overall.)
+ */
+public class ThrottledInputStream extends InputStream {
+
+  private final InputStream rawStream;
+  private final long maxBytesPerSec;
+  private final long startTime = System.currentTimeMillis();
+
+  private long bytesRead = 0;
+  private long totalSleepTime = 0;
+
+  private static final long SLEEP_DURATION_MS = 50;
+
+  public ThrottledInputStream(InputStream rawStream) {
+    this(rawStream, Long.MAX_VALUE);
+  }
+
+  public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
+    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 
+    this.rawStream = rawStream;
+    this.maxBytesPerSec = maxBytesPerSec;
+  }
+
+  @Override
+  public void close() throws IOException {
+    rawStream.close();
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read() throws IOException {
+    throttle();
+    int data = rawStream.read();
+    if (data != -1) {
+      bytesRead++;
+    }
+    return data;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read(byte[] b) throws IOException {
+    throttle();
+    int readLen = rawStream.read(b);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    throttle();
+    int readLen = rawStream.read(b, off, len);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
+  private void throttle() throws IOException {
+    if (getBytesPerSec() > maxBytesPerSec) {
+      try {
+        Thread.sleep(SLEEP_DURATION_MS);
+        totalSleepTime += SLEEP_DURATION_MS;
+      } catch (InterruptedException e) {
+        throw new IOException("Thread aborted", e);
+      }
+    }
+  }
+
+  /**
+   * Getter for the number of bytes read from this stream, since creation.
+   * @return The number of bytes.
+   */
+  public long getTotalBytesRead() {
+    return bytesRead;
+  }
+
+  /**
+   * Getter for the read-rate from this stream, since creation.
+   * Calculated as bytesRead/elapsedTimeSinceStart.
+   * @return Read rate, in bytes/sec.
+   */
+  public long getBytesPerSec() {
+    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
+    if (elapsed == 0) {
+      return bytesRead;
+    } else {
+      return bytesRead / elapsed;
+    }
+  }
+
+  /**
+   * Getter the total time spent in sleep.
+   * @return Number of milliseconds spent in sleep.
+   */
+  public long getTotalSleepTime() {
+    return totalSleepTime;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public String toString() {
+    return "ThrottledInputStream{" +
+        "bytesRead=" + bytesRead +
+        ", maxBytesPerSec=" + maxBytesPerSec +
+        ", bytesPerSec=" + getBytesPerSec() +
+        ", totalSleepTime=" + totalSleepTime +
+        '}';
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java (original)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java Fri Jun 21 06:37:27 2013
@@ -66,6 +66,11 @@ public class Hadoop20JHParser implements
     reader = new LineReader(input);
   }
 
+  public Hadoop20JHParser(LineReader reader) throws IOException {
+    super();
+    this.reader = reader;
+  }
+
   Map<String, HistoryEventEmitter> liveEmitters =
       new HashMap<String, HistoryEventEmitter>();
   Queue<HistoryEvent> remainingEvents = new LinkedList<HistoryEvent>();

Modified: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java (original)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java Fri Jun 21 06:37:27 2013
@@ -66,6 +66,26 @@ public class Job20LineHistoryEventEmitte
       String user = line.get("USER");
       String jobName = line.get("JOBNAME");
       String queueName = line.get("JOB_QUEUE");
+      String workflowId = line.get("WORKFLOW_ID");
+      if (workflowId == null) {
+        workflowId = "";
+      }
+      String workflowName = line.get("WORKFLOW_NAME");
+      if (workflowName == null) {
+        workflowName = "";
+      }
+      String workflowNodeName = line.get("WORKFLOW_NODE_NAME");
+      if (workflowNodeName == null) {
+        workflowNodeName = "";
+      }
+      String workflowAdjacencies = line.get("WORKFLOW_ADJACENCIES");
+      if (workflowAdjacencies == null) {
+        workflowAdjacencies = "";
+      }
+      String workflowTags = line.get("WORKFLOW_TAGS");
+      if (workflowTags == null) {
+        workflowTags = "";
+      }
 
       if (submitTime != null) {
         Job20LineHistoryEventEmitter that =
@@ -75,8 +95,10 @@ public class Job20LineHistoryEventEmitte
 
         Map<JobACL, AccessControlList> jobACLs =
           new HashMap<JobACL, AccessControlList>();
-        return new JobSubmittedEvent(jobID, jobName, user == null ? "nulluser"
-            : user, that.originalSubmitTime, jobConf, jobACLs, queueName);
+        return new JobSubmittedEvent(jobID, jobName,
+            user == null ? "nulluser" : user, that.originalSubmitTime,
+            jobConf, jobACLs, queueName, workflowId, workflowName,
+            workflowNodeName, workflowAdjacencies, workflowTags);
       }
 
       return null;

Modified: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java (original)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java Fri Jun 21 06:37:27 2013
@@ -38,31 +38,53 @@ public class JobSubmittedEvent implement
   private String jobConfPath;
   private Map<JobACL, AccessControlList> jobAcls;
   private String queue;
+  private String workflowId;
+  private String workflowName;
+  private String workflowNodeName;
+  private String workflowAdjacencies;
+  private String workflowTags;
 
   /**
    * @deprecated Use
    *             {@link #JobSubmittedEvent(JobID, String, String, long, String,
-   *             Map, String)}
+   *             Map, String, String, String, String, String, String)}
    *             instead.
    */
   @Deprecated
   public JobSubmittedEvent(JobID id, String jobName, String userName,
       long submitTime, String jobConfPath) {
     this(id, jobName, userName, submitTime, jobConfPath,
-        new HashMap<JobACL, AccessControlList>(), null);
+        new HashMap<JobACL, AccessControlList>(), null, "", "", "", "");
   }
 
   /**
    * @deprecated Use
    *             {@link #JobSubmittedEvent(JobID, String, String, long, String,
-   *             Map, String)}
+   *             Map, String, String, String, String, String, String)}
    *             instead.
    */
   @Deprecated
   public JobSubmittedEvent(JobID id, String jobName, String userName,
       long submitTime, String jobConfPath,
       Map<JobACL, AccessControlList> jobACLs) {
-    this(id, jobName, userName, submitTime, jobConfPath, jobACLs, null);
+    this(id, jobName, userName, submitTime, jobConfPath, jobACLs, null,
+        "", "", "", "");
+  }
+
+  /**
+   * @deprecated Use
+   *             {@link #JobSubmittedEvent(JobID, String, String, long, String,
+   *             Map, String, String, String, String, String, String)}
+   *             instead.
+   */
+  @Deprecated
+  public JobSubmittedEvent(JobID id, String jobName, String userName,
+      long submitTime, String jobConfPath,
+      Map<JobACL, AccessControlList> jobACLs, String queue,
+      String workflowId, String workflowName, String workflowNodeName,
+      String workflowAdjacencies) {
+    this(id, jobName, userName, submitTime, jobConfPath, jobACLs, queue,
+        workflowId, workflowName, workflowNodeName, workflowAdjacencies, "");
   }
 
   /**
@@ -74,10 +96,17 @@ public class JobSubmittedEvent implement
    * @param jobConfPath Path of the Job Configuration file
    * @param jobACLs The configured acls for the job.
    * @param queue job queue name
+   * @param workflowId the workflow Id
+   * @param workflowName the workflow name
+   * @param workflowNodeName the workflow node name
+   * @param workflowAdjacencies the workflow adjacencies
+   * @param workflowTags Comma-separated workflow tags
    */
   public JobSubmittedEvent(JobID id, String jobName, String userName,
       long submitTime, String jobConfPath,
-      Map<JobACL, AccessControlList> jobACLs, String queue) {
+      Map<JobACL, AccessControlList> jobACLs, String queue,
+      String workflowId, String workflowName, String workflowNodeName,
+      String workflowAdjacencies, String workflowTags) {
     this.jobId = id;
     this.jobName = jobName;
     this.userName = userName;
@@ -85,6 +114,11 @@ public class JobSubmittedEvent implement
     this.jobConfPath = jobConfPath;
     this.jobAcls = jobACLs;
     this.queue = queue;
+    this.workflowId = workflowId;
+    this.workflowName = workflowName;
+    this.workflowNodeName = workflowNodeName;
+    this.workflowAdjacencies = workflowAdjacencies;
+    this.workflowTags = workflowTags;
   }
 
   /** Get the Job Id */
@@ -101,10 +135,30 @@ public class JobSubmittedEvent implement
   public Map<JobACL, AccessControlList> getJobAcls() {
     return jobAcls;
   }
-
+  /** Get the acls configured for the job **/
   public String getJobQueueName() {
     return queue;
   }
+  /** Get the workflow Id */
+  public String getWorkflowId() {
+    return workflowId;
+  }
+  /** Get the workflow name */
+  public String getWorkflowName() {
+    return workflowName;
+  }
+  /** Get the workflow node name */
+  public String getWorkflowNodeName() {
+    return workflowNodeName;
+  }
+  /** Get the workflow adjacencies */
+  public String getWorkflowAdjacencies() {
+    return workflowAdjacencies;
+  }
+  /** Get the workflow tags */
+  public String getWorkflowTags() {
+    return workflowTags;
+  }
 
   /** Get the event type */
   public EventType getEventType() { return EventType.JOB_SUBMITTED; }

Modified: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java (original)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java Fri Jun 21 06:37:27 2013
@@ -61,6 +61,14 @@ public abstract class TaskAttempt20LineE
       String taskType = line.get("TASK_TYPE");
       String trackerName = line.get("TRACKER_NAME");
       String httpPort = line.get("HTTP_PORT");
+      String locality = line.get("LOCALITY");
+      if (locality == null) {
+        locality = "";
+      }
+      String avataar = line.get("AVATAAR");
+      if (avataar == null) {
+        avataar = "";
+      }
 
       if (startTime != null && taskType != null) {
         TaskAttempt20LineEventEmitter that =
@@ -75,7 +83,8 @@ public abstract class TaskAttempt20LineE
                 .parseInt(httpPort);
 
         return new TaskAttemptStartedEvent(taskAttemptID,
-            that.originalTaskType, that.originalStartTime, trackerName, port);
+            that.originalTaskType, that.originalStartTime, trackerName, port,
+            locality, avataar);
       }
 
       return null;

Modified: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java (original)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java Fri Jun 21 06:37:27 2013
@@ -84,7 +84,13 @@ public class TaskAttemptFinishedEvent  i
   public JhCounters getCounters() { return counters; }
   /** Get the event type */
   public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_FINISHED;
+    if (taskType == TaskType.JOB_SETUP) {
+      return EventType.SETUP_ATTEMPT_FINISHED;
+    } else if (taskType == TaskType.JOB_CLEANUP) {
+      return EventType.CLEANUP_ATTEMPT_FINISHED;
+    }
+    return attemptId.isMap() ? 
+        EventType.MAP_ATTEMPT_FINISHED : EventType.REDUCE_ATTEMPT_FINISHED;
   }
 
 }

Modified: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java (original)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java Fri Jun 21 06:37:27 2013
@@ -35,6 +35,8 @@ public class TaskAttemptStartedEvent imp
   private TaskType taskType;
   private String trackerName;
   private int httpPort;
+  private String locality;
+  private String avataar;
 
   /**
    * Create an event to record the start of an attempt
@@ -43,16 +45,20 @@ public class TaskAttemptStartedEvent imp
    * @param startTime Start time of the attempt
    * @param trackerName Name of the Task Tracker where attempt is running
    * @param httpPort The port number of the tracker
+   * @param locality the locality of the task attempt
+   * @param avataar the avataar of the task attempt
    */
   public TaskAttemptStartedEvent( TaskAttemptID attemptId,  
       TaskType taskType, long startTime, String trackerName,
-      int httpPort) {
+      int httpPort, String locality, String avataar) {
     this.taskId = attemptId.getTaskID();
     this.attemptId = attemptId;
     this.startTime = startTime;
     this.taskType = taskType;
     this.trackerName = trackerName;
     this.httpPort = httpPort;
+    this.locality = locality;
+    this.avataar = avataar;
   }
 
   /** Get the task id */
@@ -71,9 +77,23 @@ public class TaskAttemptStartedEvent imp
   public TaskAttemptID getTaskAttemptId() {
     return attemptId;
   }
+  /** Get the locality of the task attempt */
+  public String getLocality() {
+    return locality;
+  }
+  /** Get the avataar of the task attempt */
+  public String getAvataar() {
+    return avataar;
+  }
   /** Get the event type */
   public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_STARTED;
+    if (taskType == TaskType.JOB_SETUP) {
+      return EventType.SETUP_ATTEMPT_STARTED;
+    } else if (taskType == TaskType.JOB_CLEANUP) {
+      return EventType.CLEANUP_ATTEMPT_STARTED;
+    }
+    return attemptId.isMap() ? 
+        EventType.MAP_ATTEMPT_STARTED : EventType.REDUCE_ATTEMPT_STARTED;
   }
 
 }

Modified: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java (original)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java Fri Jun 21 06:37:27 2013
@@ -77,7 +77,23 @@ public class TaskAttemptUnsuccessfulComp
   public String getTaskStatus() { return status; }
   /** Get the event type */
   public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_KILLED;
+    if (status.equals("FAILED")) {
+      if (taskType == TaskType.JOB_SETUP) {
+        return EventType.SETUP_ATTEMPT_FAILED;
+      } else if (taskType == TaskType.JOB_CLEANUP) {
+        return EventType.CLEANUP_ATTEMPT_FAILED;
+      }
+      return attemptId.isMap() ? 
+          EventType.MAP_ATTEMPT_FAILED : EventType.REDUCE_ATTEMPT_FAILED;
+    } else {
+      if (taskType == TaskType.JOB_SETUP) {
+        return EventType.SETUP_ATTEMPT_KILLED;
+      } else if (taskType == TaskType.JOB_CLEANUP) {
+        return EventType.CLEANUP_ATTEMPT_KILLED;
+      }
+      return attemptId.isMap() ? 
+          EventType.MAP_ATTEMPT_KILLED : EventType.REDUCE_ATTEMPT_KILLED;
+    }
   }
 
 }

Modified: hadoop/common/branches/branch-1-win/src/webapps/datanode/browseBlock.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/datanode/browseBlock.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/datanode/browseBlock.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/datanode/browseBlock.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="javax.servlet.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/datanode/browseDirectory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/datanode/browseDirectory.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/datanode/browseDirectory.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/datanode/browseDirectory.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="javax.servlet.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/datanode/tail.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/datanode/tail.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/datanode/tail.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/datanode/tail.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="javax.servlet.*"

Added: hadoop/common/branches/branch-1-win/src/webapps/hdfs/corrupt_files.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/hdfs/corrupt_files.jsp?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/hdfs/corrupt_files.jsp (added)
+++ hadoop/common/branches/branch-1-win/src/webapps/hdfs/corrupt_files.jsp Fri Jun 21 06:37:27 2013
@@ -0,0 +1,79 @@
+
+<%
+  /*
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+   *
+   *     http://www.apache.org/licenses/LICENSE-2.0
+   *
+   * Unless required by applicable law or agreed to in writing, software
+   * distributed under the License is distributed on an "AS IS" BASIS,
+   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   * See the License for the specific language governing permissions and
+   * limitations under the License.
+   */
+%>
+<%@ page contentType="text/html; charset=UTF-8"
+	import="org.apache.hadoop.util.ServletUtil"
+	import="org.apache.hadoop.fs.FileStatus"
+	import="org.apache.hadoop.fs.FileUtil"
+	import="org.apache.hadoop.fs.Path"
+	import="java.util.Collection"
+	import="java.util.Arrays" %>
+<%!//for java.io.Serializable
+  private static final long serialVersionUID = 1L;%>
+<%
+  NameNode nn = (NameNode) application.getAttribute("name.node");
+  FSNamesystem fsn = nn.getNamesystem();
+  // String namenodeRole = nn.getRole().toString();
+  String namenodeLabel = nn.getNameNodeAddress().getHostName() + ":"
+      + nn.getNameNodeAddress().getPort();
+  Collection<FSNamesystem.CorruptFileBlockInfo> corruptFileBlocks = 
+	fsn.listCorruptFileBlocks();
+  int corruptFileCount = corruptFileBlocks.size();
+
+%>
+
+<html>
+<link rel="stylesheet" type="text/css" href="/static/hadoop.css">
+<title>Hadoop <%=namenodeLabel%></title>
+<body>
+<h1>'<%=namenodeLabel%>'</h1>
+<%=JspHelper.getVersionTable(fsn)%>
+<br>
+<b><a href="/nn_browsedfscontent.jsp">Browse the filesystem</a></b>
+<br>
+<b><a href="/logs/"><%=namenodeLabel%> Logs</a></b>
+<br>
+<b><a href=/dfshealth.jsp> Go back to DFS home</a></b>
+<hr>
+<h3>Reported Corrupt Files</h3>
+<%
+  if (corruptFileCount == 0) {
+%>
+    <i>No missing blocks found at the moment.</i> <br>
+    Please run fsck for a thorough health analysis.
+<%
+  } else {
+    for (FSNamesystem.CorruptFileBlockInfo c : corruptFileBlocks) {
+      String currentFileBlock = c.toString();
+%>
+      <%=currentFileBlock%><br>
+<%
+    }
+%>
+    <p>
+      <b>Total:</b> At least <%=corruptFileCount%> corrupt file(s)
+    </p>
+<%
+  }
+%>
+
+<%
+  out.println(ServletUtil.htmlFooter());
+%>

Modified: hadoop/common/branches/branch-1-win/src/webapps/hdfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/hdfs/dfshealth.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/hdfs/dfshealth.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/hdfs/dfshealth.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   isThreadSafe="false"
@@ -262,7 +277,10 @@
 <h3>Cluster Summary</h3>
 <b> <%= jspHelper.getSafeModeText()%> </b>
 <b> <%= jspHelper.getInodeLimitText()%> </b>
-<a class="warning"> <%= JspHelper.getWarningText(fsn)%></a>
+<a class="warning" href="/corrupt_files.jsp" title="List corrupt files">
+  <%= JspHelper.getWarningText(fsn)%>
+</a>
+
 
 <%
     generateDFSHealthReport(out, nn, request); 

Modified: hadoop/common/branches/branch-1-win/src/webapps/hdfs/dfsnodelist.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/hdfs/dfsnodelist.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/hdfs/dfsnodelist.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/hdfs/dfsnodelist.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
     contentType="text/html; charset=UTF-8"
     isThreadSafe="false"

Modified: hadoop/common/branches/branch-1-win/src/webapps/hdfs/index.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/hdfs/index.html?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/hdfs/index.html (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/hdfs/index.html Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<!--
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+
 <meta HTTP-EQUIV="REFRESH" content="0;url=dfshealth.jsp"/>
 <html>
 

Modified: hadoop/common/branches/branch-1-win/src/webapps/hdfs/nn_browsedfscontent.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/hdfs/nn_browsedfscontent.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/hdfs/nn_browsedfscontent.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/hdfs/nn_browsedfscontent.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="java.io.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/history/analysejobhistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/history/analysejobhistory.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/history/analysejobhistory.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/history/analysejobhistory.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="javax.servlet.http.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/history/jobconf_history.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/history/jobconf_history.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/history/jobconf_history.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/history/jobconf_history.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="javax.servlet.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/history/jobdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/history/jobdetailshistory.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/history/jobdetailshistory.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/history/jobdetailshistory.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="javax.servlet.http.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/history/jobhistoryhome.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/history/jobhistoryhome.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/history/jobhistoryhome.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/history/jobhistoryhome.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="java.io.*"
@@ -90,7 +105,8 @@ window.location.href = url;
 
     final String soughtDate = dateSplit.length > 1 ? dateSplit[1] : "";
 
-    final String parts[] = dateSplit[0].split(":");
+    final String parts[] = dateSplit.length >= 1 ? dateSplit[0].split(":")
+                                                 : new String[0];
 
     final String rawUser = (parts.length >= 1)
                             ? parts[0].toLowerCase()
@@ -137,11 +153,24 @@ window.location.href = url;
                  || fileName.split("_")[FILENAME_JOBNAME_PART].toLowerCase()
                        .contains(jobnameKeywordInFname);
       }
+      
+      private boolean isHistoryFile(String fileName) {      	
+        String[] tokens = null;
+        try {
+          String dp = JobHistory.JobInfo.decodeJobHistoryFileName(fileName);
+          tokens = dp.split("_");
+        } catch (IOException ioe) {
+        }
+	    
+        return tokens != null && !fileName.endsWith(".xml") && tokens.length > 3
+            && tokens[1].matches("\\d+")  && tokens[2].matches("\\d+")
+            && tokens[3].matches("\\d+");
+      }
 
       public boolean accept(Path path) {
         String name = path.getName();
 
-        return !(name.endsWith(".xml")) && matchUser(name) && matchJobName(name);
+        return isHistoryFile(name) && matchUser(name) && matchJobName(name);
       }
     };
     

Modified: hadoop/common/branches/branch-1-win/src/webapps/history/jobtaskshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/history/jobtaskshistory.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/history/jobtaskshistory.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/history/jobtaskshistory.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="javax.servlet.http.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/history/legacyjobhistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/history/legacyjobhistory.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/history/legacyjobhistory.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/history/legacyjobhistory.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="java.io.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/history/loadhistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/history/loadhistory.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/history/loadhistory.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/history/loadhistory.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="org.apache.hadoop.mapred.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/history/taskdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/history/taskdetailshistory.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/history/taskdetailshistory.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/history/taskdetailshistory.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="javax.servlet.http.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/job/index.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/job/index.html?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/job/index.html (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/job/index.html Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<!--
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+
 <meta HTTP-EQUIV="REFRESH" content="0;url=jobtracker.jsp"/>
 <html>
 

Modified: hadoop/common/branches/branch-1-win/src/webapps/job/jobblacklistedtrackers.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/job/jobblacklistedtrackers.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/job/jobblacklistedtrackers.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/job/jobblacklistedtrackers.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="javax.servlet.*"

Modified: hadoop/common/branches/branch-1-win/src/webapps/job/jobconf.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/webapps/job/jobconf.jsp?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/webapps/job/jobconf.jsp (original)
+++ hadoop/common/branches/branch-1-win/src/webapps/job/jobconf.jsp Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+<%!
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+%>
 <%@ page
   contentType="text/html; charset=UTF-8"
   import="javax.servlet.*"



Mime
View raw message