hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r783055 [5/6] - in /hadoop/core/branches/HADOOP-3628-2: ./ .eclipse.templates/ ivy/ lib/ lib/jsp-2.1/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-schedu...
Date Tue, 09 Jun 2009 16:11:23 GMT
Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Jun  9 16:11:19 2009
@@ -87,6 +87,12 @@
   int speculativeMapTasks = 0;
   int speculativeReduceTasks = 0;
   
+  // Limits on concurrent running tasks per-node and cluster-wide
+  private int maxMapsPerNode;
+  private int maxReducesPerNode;
+  private int runningMapLimit;
+  private int runningReduceLimit;
+  
   int mapFailuresPercent = 0;
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
@@ -257,6 +263,11 @@
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+    this.maxMapsPerNode = conf.getMaxMapsPerNode();
+    this.maxReducesPerNode = conf.getMaxReducesPerNode();
+    this.runningMapLimit = conf.getRunningMapLimit();
+    this.runningReduceLimit = conf.getRunningReduceLimit();
         
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
@@ -575,6 +586,14 @@
     return inputLength;
   }
  
+  boolean isCleanupLaunched() {
+    return launchedCleanup;
+  }
+
+  boolean isSetupLaunched() {
+    return launchedSetup;
+  }
+
   /**
    * Get the list of map tasks
    * @return the raw array of maps for this job
@@ -1658,6 +1677,11 @@
                                           final int numUniqueHosts,
                                           final int maxCacheLevel,
                                           final double avgProgress) {
+    if (numMapTasks == 0) {
+      LOG.info("No maps to schedule for " + profile.getJobID());
+      return -1;
+    }
+
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
     
@@ -1666,6 +1690,10 @@
     //
     this.clusterSize = clusterSize;
 
+    if (!belowRunningTaskLimit(tts, true)) {
+      return -1;
+    }
+    
     if (!shouldRunOnTaskTracker(taskTracker)) {
       return -1;
     }
@@ -1863,11 +1891,20 @@
                                              int clusterSize,
                                              int numUniqueHosts,
                                              double avgProgress) {
+    if (numReduceTasks == 0) {
+      LOG.info("No reduces to schedule for " + profile.getJobID());
+      return -1;
+    }
+
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
-    
+
     // Update the last-known clusterSize
     this.clusterSize = clusterSize;
+    
+    if (!belowRunningTaskLimit(tts, false)) {
+      return -1;
+    }
 
     if (!shouldRunOnTaskTracker(taskTracker)) {
       return -1;
@@ -1921,6 +1958,42 @@
     }
     return true;
   }
+  
+  /**
+   * Check whether we are below the running task limits (per node and cluster
+   * wide) for a given type of task on a given task tracker.
+   * 
+   * @param tts task tracker to check on
+   * @param map true if looking at map tasks, false for reduce tasks
+   * @return true if we are below both the cluster-wide and the per-node 
+   *         running task limit for the given type of task
+   */
+  private boolean belowRunningTaskLimit(TaskTrackerStatus tts, boolean map) {
+    int runningTasks = map ? runningMapTasks : runningReduceTasks;
+    int clusterLimit = map ? runningMapLimit : runningReduceLimit;
+    int perNodeLimit = map ? maxMapsPerNode  : maxReducesPerNode;
+    
+    // Check cluster-wide limit
+    if (clusterLimit != -1 && runningTasks >= clusterLimit) {
+      return false;
+    }
+    
+    // Check per-node limit
+    if (perNodeLimit != -1) {
+      int runningTasksOnNode = 0;
+      for (TaskStatus ts: tts.getTaskReports()) {
+        if (ts.getTaskID().getJobID().equals(jobId) && ts.getIsMap() == map &&
+            ts.getRunState().equals(TaskStatus.State.RUNNING)) {
+          runningTasksOnNode++;
+        }
+      }
+      if (runningTasksOnNode >= perNodeLimit) {
+        return false;
+      }
+    }
+    
+    return true;
+  }
 
   /**
    * A taskid assigned to this JobInProgress has reported in successfully.

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Jun  9 16:11:19 2009
@@ -118,6 +118,9 @@
   // The maximum number of blacklists for a tracker after which the 
   // tracker could be blacklisted across all jobs
   private int MAX_BLACKLISTS_PER_TRACKER = 4;
+  // Approximate number of heartbeats that could arrive JobTracker
+  // in a second
+  private int NUM_HEARTBEATS_IN_SECOND = 100;
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
@@ -1447,6 +1450,10 @@
   Map<String, Set<JobID>> trackerToJobsToCleanup = 
     new HashMap<String, Set<JobID>>();
   
+  // (trackerID --> list of tasks to cleanup)
+  Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup = 
+    new HashMap<String, Set<TaskAttemptID>>();
+  
   // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
   Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
     new TreeMap<TaskAttemptID, TaskInProgress>();
@@ -1577,6 +1584,8 @@
     MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
     MAX_BLACKLISTS_PER_TRACKER = 
         conf.getInt("mapred.max.tracker.blacklists", 4);
+    NUM_HEARTBEATS_IN_SECOND = 
+        conf.getInt("mapred.heartbeats.in.second", 100);
 
     //This configuration is there solely for tuning purposes and 
     //once this feature has been tested in real clusters and an appropriate
@@ -2786,7 +2795,7 @@
     int clusterSize = getClusterStatus().getTaskTrackers();
     int heartbeatInterval =  Math.max(
                                 (int)(1000 * Math.ceil((double)clusterSize / 
-                                                       CLUSTER_INCREMENT)),
+                                                       NUM_HEARTBEATS_IN_SECOND)),
                                 HEARTBEAT_INTERVAL_MIN) ;
     return heartbeatInterval;
   }
@@ -2930,8 +2939,8 @@
                                                               String taskTracker) {
     
     Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
+    List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
     if (taskIds != null) {
-      List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
       for (TaskAttemptID killTaskId : taskIds) {
         TaskInProgress tip = taskidToTIPMap.get(killTaskId);
         if (tip == null) {
@@ -2949,10 +2958,18 @@
           }
         }
       }
-            
-      return killList;
     }
-    return null;
+    
+    // add the stray attempts for uninited jobs
+    synchronized (trackerToTasksToCleanup) {
+      Set<TaskAttemptID> set = trackerToTasksToCleanup.remove(taskTracker);
+      if (set != null) {
+        for (TaskAttemptID id : set) {
+          killList.add(new KillTaskAction(id));
+        }
+      }
+    }
+    return killList;
   }
 
   /**
@@ -3638,6 +3655,19 @@
         continue;
       }
       
+      if (!job.inited()) {
+        // if job is not yet initialized ... kill the attempt
+        synchronized (trackerToTasksToCleanup) {
+          Set<TaskAttemptID> tasks = trackerToTasksToCleanup.get(trackerName);
+          if (tasks == null) {
+            tasks = new HashSet<TaskAttemptID>();
+            trackerToTasksToCleanup.put(trackerName, tasks);
+          }
+          tasks.add(taskId);
+        }
+        continue;
+      }
+
       TaskInProgress tip = taskidToTIPMap.get(taskId);
       // Check if the tip is known to the jobtracker. In case of a restarted
       // jt, some tasks might join in later
@@ -3702,6 +3732,10 @@
       trackerToJobsToCleanup.remove(trackerName);
     }
     
+    synchronized (trackerToTasksToCleanup) {
+      trackerToTasksToCleanup.remove(trackerName);
+    }
+    
     // Inform the recovery manager
     recoveryManager.unMarkTracker(trackerName);
     

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/MRConstants.java Tue Jun  9 16:11:19 2009
@@ -27,8 +27,6 @@
   //
   public static final int HEARTBEAT_INTERVAL_MIN = 3 * 1000;
   
-  public static final int CLUSTER_INCREMENT = 100;
-
   public static final long COUNTER_UPDATE_INTERVAL = 60 * 1000;
 
   //

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Jun  9 16:11:19 2009
@@ -592,8 +592,8 @@
     
   }
 
-  class MapOutputBuffer<K extends Object, V extends Object> 
-  implements MapOutputCollector<K, V>, IndexedSortable {
+  class MapOutputBuffer<K extends Object, V extends Object>
+      implements MapOutputCollector<K, V>, IndexedSortable {
     private final int partitions;
     private final Partitioner<K, V> partitioner;
     private final JobConf job;
@@ -635,6 +635,8 @@
     private volatile Throwable sortSpillException = null;
     private final int softRecordLimit;
     private final int softBufferLimit;
+    private int recordRemaining;
+    private int bufferRemaining;
     private final int minSpillsForCombine;
     private final IndexedSorter sorter;
     private final ReentrantLock spillLock = new ReentrantLock();
@@ -682,8 +684,8 @@
       if ((sortmb & 0x7FF) != sortmb) {
         throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
       }
-      sorter = ReflectionUtils.newInstance(
-            job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
+      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
+            QuickSort.class, IndexedSorter.class), job);
       LOG.info("io.sort.mb = " + sortmb);
       // buffers and accounting
       int maxMemUsage = sortmb << 20;
@@ -696,6 +698,8 @@
       kvindices = new int[recordCapacity * ACCTSIZE];
       softBufferLimit = (int)(kvbuffer.length * spillper);
       softRecordLimit = (int)(kvoffsets.length * spillper);
+      recordRemaining = softRecordLimit;
+      bufferRemaining = softBufferLimit;
       LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
       LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
       // k/v serialization
@@ -763,38 +767,52 @@
                               + value.getClass().getName());
       }
       final int kvnext = (kvindex + 1) % kvoffsets.length;
-      spillLock.lock();
-      try {
-        boolean kvfull;
-        do {
-          if (sortSpillException != null) {
-            throw (IOException)new IOException("Spill failed"
-                ).initCause(sortSpillException);
-          }
-          // sufficient acct space
-          kvfull = kvnext == kvstart;
-          final boolean kvsoftlimit = ((kvnext > kvend)
-              ? kvnext - kvend > softRecordLimit
-              : kvend - kvnext <= kvoffsets.length - softRecordLimit);
-          if (kvstart == kvend && kvsoftlimit) {
-            LOG.info("Spilling map output: record full = " + kvsoftlimit);
-            startSpill();
-          }
-          if (kvfull) {
-            try {
-              while (kvstart != kvend) {
-                reporter.progress();
-                spillDone.await();
+      if (--recordRemaining <= 0) {
+        // Possible for check to remain < zero, if soft limit remains
+        // in force but unsatisfiable because spill is in progress
+        spillLock.lock();
+        try {
+          boolean kvfull;
+          do {
+            if (sortSpillException != null) {
+              throw (IOException)new IOException("Spill failed"
+                  ).initCause(sortSpillException);
+            }
+            // sufficient acct space
+            kvfull = kvnext == kvstart;
+            final boolean kvsoftlimit = ((kvnext > kvend)
+                ? kvnext - kvend > softRecordLimit
+                : kvend - kvnext <= kvoffsets.length - softRecordLimit);
+            if (kvstart == kvend && kvsoftlimit) {
+              LOG.info("Spilling map output: record full = " + kvfull);
+              startSpill();
+            }
+            if (kvfull) {
+              try {
+                while (kvstart != kvend) {
+                  reporter.progress();
+                  spillDone.await();
+                }
+              } catch (InterruptedException e) {
+                throw (IOException)new IOException(
+                    "Collector interrupted while waiting for the writer"
+                    ).initCause(e);
               }
-            } catch (InterruptedException e) {
-              throw (IOException)new IOException(
-                  "Collector interrupted while waiting for the writer"
-                  ).initCause(e);
             }
-          }
-        } while (kvfull);
-      } finally {
-        spillLock.unlock();
+          } while (kvfull);
+          final int softOff = kvend + softRecordLimit;
+          recordRemaining = Math.min(
+              // out of acct space
+              (kvnext < kvstart
+                 ? kvstart - kvnext
+                 : kvoffsets.length - kvnext + kvstart),
+              // soft limit
+              (kvend < kvnext
+                 ? softOff - kvnext
+                 : kvnext + (softOff - kvoffsets.length)));
+        } finally {
+          spillLock.unlock();
+        }
       }
 
       try {
@@ -905,7 +923,7 @@
        * likely result in data loss or corruption.
        * @see #markRecord()
        */
-      protected synchronized void reset() throws IOException {
+      protected void reset() throws IOException {
         // spillLock unnecessary; If spill wraps, then
         // bufindex < bufstart < bufend so contention is impossible
         // a stale value for bufstart does not affect correctness, since
@@ -931,7 +949,7 @@
       private final byte[] scratch = new byte[1];
 
       @Override
-      public synchronized void write(int v)
+      public void write(int v)
           throws IOException {
         scratch[0] = (byte)v;
         write(scratch, 0, 1);
@@ -945,69 +963,86 @@
        *    deserialize into the collection buffer.
        */
       @Override
-      public synchronized void write(byte b[], int off, int len)
+      public void write(byte b[], int off, int len)
           throws IOException {
         boolean buffull = false;
         boolean wrap = false;
-        spillLock.lock();
-        try {
-          do {
-            if (sortSpillException != null) {
-              throw (IOException)new IOException("Spill failed"
-                  ).initCause(sortSpillException);
-            }
+        bufferRemaining -= len;
+        if (bufferRemaining <= 0) {
+          // writing these bytes could exhaust available buffer space
+          // check if spill or blocking is necessary
+          spillLock.lock();
+          try {
+            do {
+              if (sortSpillException != null) {
+                throw (IOException)new IOException("Spill failed"
+                    ).initCause(sortSpillException);
+              }
 
-            // sufficient buffer space?
-            if (bufstart <= bufend && bufend <= bufindex) {
-              buffull = bufindex + len > bufvoid;
-              wrap = (bufvoid - bufindex) + bufstart > len;
-            } else {
-              // bufindex <= bufstart <= bufend
-              // bufend <= bufindex <= bufstart
-              wrap = false;
-              buffull = bufindex + len > bufstart;
-            }
+              // sufficient buffer space?
+              if (bufstart <= bufend && bufend <= bufindex) {
+                buffull = bufindex + len > bufvoid;
+                wrap = (bufvoid - bufindex) + bufstart > len;
+              } else {
+                // bufindex <= bufstart <= bufend
+                // bufend <= bufindex <= bufstart
+                wrap = false;
+                buffull = bufindex + len > bufstart;
+              }
 
-            if (kvstart == kvend) {
-              // spill thread not running
-              if (kvend != kvindex) {
-                // we have records we can spill
-                final boolean bufsoftlimit = (bufindex > bufend)
-                  ? bufindex - bufend > softBufferLimit
-                  : bufend - bufindex < bufvoid - softBufferLimit;
-                if (bufsoftlimit || (buffull && !wrap)) {
-                  LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
-                  startSpill();
+              if (kvstart == kvend) {
+                // spill thread not running
+                if (kvend != kvindex) {
+                  // we have records we can spill
+                  final boolean bufsoftlimit = (bufindex > bufend)
+                    ? bufindex - bufend > softBufferLimit
+                    : bufend - bufindex < bufvoid - softBufferLimit;
+                  if (bufsoftlimit || (buffull && !wrap)) {
+                    LOG.info("Spilling map output: buffer full= " + (buffull && !wrap));
+                    startSpill();
+                  }
+                } else if (buffull && !wrap) {
+                  // We have no buffered records, and this record is too large
+                  // to write into kvbuffer. We must spill it directly from
+                  // collect
+                  final int size = ((bufend <= bufindex)
+                    ? bufindex - bufend
+                    : (bufvoid - bufend) + bufindex) + len;
+                  bufstart = bufend = bufindex = bufmark = 0;
+                  kvstart = kvend = kvindex = 0;
+                  bufvoid = kvbuffer.length;
+                  throw new MapBufferTooSmallException(size + " bytes");
                 }
-              } else if (buffull && !wrap) {
-                // We have no buffered records, and this record is too large
-                // to write into kvbuffer. We must spill it directly from
-                // collect
-                final int size = ((bufend <= bufindex)
-                  ? bufindex - bufend
-                  : (bufvoid - bufend) + bufindex) + len;
-                bufstart = bufend = bufindex = bufmark = 0;
-                kvstart = kvend = kvindex = 0;
-                bufvoid = kvbuffer.length;
-                throw new MapBufferTooSmallException(size + " bytes");
               }
-            }
 
-            if (buffull && !wrap) {
-              try {
-                while (kvstart != kvend) {
-                  reporter.progress();
-                  spillDone.await();
+              if (buffull && !wrap) {
+                try {
+                  while (kvstart != kvend) {
+                    reporter.progress();
+                    spillDone.await();
+                  }
+                } catch (InterruptedException e) {
+                    throw (IOException)new IOException(
+                        "Buffer interrupted while waiting for the writer"
+                        ).initCause(e);
                 }
-              } catch (InterruptedException e) {
-                  throw (IOException)new IOException(
-                      "Buffer interrupted while waiting for the writer"
-                      ).initCause(e);
               }
-            }
-          } while (buffull && !wrap);
-        } finally {
-          spillLock.unlock();
+            } while (buffull && !wrap);
+            final int softOff = bufend + softBufferLimit;
+            bufferRemaining = Math.min(
+                // out of buffer space
+                (bufindex < bufstart
+                   ? bufstart - bufindex
+                   : kvbuffer.length - bufindex + bufstart),
+                // soft limit
+                (bufend < bufindex
+                   ? softOff - bufindex
+                   : bufindex + (softOff - kvbuffer.length)));
+          } finally {
+            spillLock.unlock();
+          }
+        } else {
+          buffull = bufindex + len > bufvoid;
         }
         // here, we know that we have sufficient space to write
         if (buffull) {
@@ -1019,11 +1054,12 @@
         }
         System.arraycopy(b, off, kvbuffer, bufindex, len);
         bufindex += len;
+        bufferRemaining -= len;
       }
     }
 
-    public synchronized void flush() throws IOException, ClassNotFoundException,
-                                            InterruptedException {
+    public void flush() throws IOException, ClassNotFoundException,
+           InterruptedException {
       LOG.info("Starting flush of map output");
       spillLock.lock();
       try {
@@ -1103,7 +1139,7 @@
       }
     }
 
-    private synchronized void startSpill() {
+    private void startSpill() {
       LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
                "; bufvoid = " + bufvoid);
       LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
@@ -1434,7 +1470,7 @@
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
-                         keyClass, valClass,
+                         keyClass, valClass, codec,
                          segmentList, mergeFactor,
                          new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter, sortSegments,

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Jun  9 16:11:19 2009
@@ -97,6 +97,25 @@
                                                mergePhase);
   }
 
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass,
+                            CompressionCodec codec,
+                            List<Segment<K, V>> segments,
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator, Progressable reporter,
+                            boolean sortSegments,
+                            Counters.Counter readsCounter,
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                           sortSegments, codec).merge(keyClass, valueClass,
+                                               mergeFactor, tmpDir,
+                                               readsCounter, writesCounter,
+                                               mergePhase);
+  }
+
   static <K extends Object, V extends Object>
     RawKeyValueIterator merge(Configuration conf, FileSystem fs,
                             Class<K> keyClass, Class<V> valueClass,
@@ -116,6 +135,27 @@
                                                mergePhase);
   }
 
+
+  static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                          Class<K> keyClass, Class<V> valueClass,
+                          CompressionCodec codec,
+                          List<Segment<K, V>> segments,
+                          int mergeFactor, int inMemSegments, Path tmpDir,
+                          RawComparator<K> comparator, Progressable reporter,
+                          boolean sortSegments,
+                          Counters.Counter readsCounter,
+                          Counters.Counter writesCounter,
+                          Progress mergePhase)
+    throws IOException {
+  return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                         sortSegments, codec).merge(keyClass, valueClass,
+                                             mergeFactor, inMemSegments,
+                                             tmpDir,
+                                             readsCounter, writesCounter,
+                                             mergePhase);
+}
+
   public static <K extends Object, V extends Object>
   void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
                  Progressable progressable, Configuration conf) 
@@ -326,6 +366,13 @@
       }
     }
 
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment<K, V>> segments, RawComparator<K> comparator,
+        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+      this(conf, fs, segments, comparator, reporter, sortSegments);
+      this.codec = codec;
+    }
+
     public void close() throws IOException {
       Segment<K, V> segment;
       while((segment = pop()) != null) {
@@ -466,9 +513,9 @@
             long startPos = segment.getPosition();
             boolean hasNext = segment.nextRawKey();
             long endPos = segment.getPosition();
-            startBytes += endPos - startPos;
             
             if (hasNext) {
+              startBytes += endPos - startPos;
               segmentsToMerge.add(segment);
               segmentsConsidered++;
             }
@@ -644,13 +691,15 @@
         segmentSizes.add(segments.get(i).getLength());
       }
       
-      if (includeFinalMerge) {
-        // just increment so that the following while loop iterates
-        // for 1 more iteration. This is to include final merge as part of
-        // the computation of expected input bytes of merges
-        n++;
-      }
-      while (n > f) {
+      // If includeFinalMerge is true, allow the following while loop iterate
+      // for 1 more iteration. This is to include final merge as part of the
+      // computation of expected input bytes of merges
+      boolean considerFinalMerge = includeFinalMerge;
+      
+      while (n > f || considerFinalMerge) {
+        if (n <=f ) {
+          considerFinalMerge = false;
+        }
         long mergedSize = 0;
         f = Math.min(f, segmentSizes.size());
         for (int j = 0; j < f; j++) {

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Jun  9 16:11:19 2009
@@ -522,6 +522,28 @@
                      Class<INVALUE> valueClass
                      ) throws IOException,InterruptedException, 
                               ClassNotFoundException {
+    // wrap value iterator to report progress.
+    final RawKeyValueIterator rawIter = rIter;
+    rIter = new RawKeyValueIterator() {
+      public void close() throws IOException {
+        rawIter.close();
+      }
+      public DataInputBuffer getKey() throws IOException {
+        return rawIter.getKey();
+      }
+      public Progress getProgress() {
+        return rawIter.getProgress();
+      }
+      public DataInputBuffer getValue() throws IOException {
+        return rawIter.getValue();
+      }
+      public boolean next() throws IOException {
+        boolean ret = rawIter.next();
+        reducePhase.set(rawIter.getProgress().getProgress());
+        reporter.progress();
+        return ret;
+      }
+    };
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
       new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
@@ -2323,7 +2345,7 @@
         memDiskSegments.clear();
         Progress mergePhase = (sortPhaseFinished) ? null : sortPhase; 
         RawKeyValueIterator diskMerge = Merger.merge(
-            job, fs, keyClass, valueClass, diskSegments,
+            job, fs, keyClass, valueClass, codec, diskSegments,
             ioSortFactor, 0 == numInMemSegments ? 0 : numInMemSegments - 1,
             tmpDir, comparator, reporter, false, spilledRecordsCounter, null,
             mergePhase);

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java Tue Jun  9 16:11:19 2009
@@ -82,7 +82,10 @@
    * @return estimated length of this job's average map output
    */
   long getEstimatedMapOutputSize() {
-    long estimate = getEstimatedTotalMapOutputSize()  / job.desiredMaps();
+    long estimate = 0L;
+    if (job.desiredMaps() > 0) {
+      estimate = getEstimatedTotalMapOutputSize()  / job.desiredMaps();
+    }
     return estimate;
   }
 

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue Jun  9 16:11:19 2009
@@ -51,18 +51,26 @@
   private List<TaskAttemptID> tasksToBeRemoved;
 
   public TaskMemoryManagerThread(TaskTracker taskTracker) {
+    this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
+            taskTracker.getJobConf().getLong(
+                "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
+                5000L));         
     this.taskTracker = taskTracker;
+  }
+
+  // mainly for test purposes. note that the tasktracker variable is
+  // not set here.
+  TaskMemoryManagerThread(long maxMemoryAllowedForAllTasks,
+                            long monitoringInterval) {
     setName(this.getClass().getName());
 
     processTreeInfoMap = new HashMap<TaskAttemptID, ProcessTreeInfo>();
     tasksToBeAdded = new HashMap<TaskAttemptID, ProcessTreeInfo>();
     tasksToBeRemoved = new ArrayList<TaskAttemptID>();
 
-    maxMemoryAllowedForAllTasks =
-        taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L;
+    this.maxMemoryAllowedForAllTasks = maxMemoryAllowedForAllTasks;
 
-    monitoringInterval = taskTracker.getJobConf().getLong(
-        "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
+    this.monitoringInterval = monitoringInterval;
   }
 
   public void addTask(TaskAttemptID tid, long memLimit) {
@@ -197,12 +205,15 @@
           ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
                                         // updated state
           long currentMemUsage = pTree.getCumulativeVmem();
+          // as processes begin with an age 1, we want to see if there 
+          // are processes more than 1 iteration old.
+          long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
           long limit = ptInfo.getMemLimit();
           LOG.info("Memory usage of ProcessTree " + pId + " :"
               + currentMemUsage + "bytes. Limit : " + limit + "bytes");
 
-          if (limit != JobConf.DISABLED_MEMORY_LIMIT
-              && currentMemUsage > limit) {
+          if (isProcessTreeOverLimit(tid.toString(), currentMemUsage, 
+                                      curMemUsageOfAgedProcesses, limit)) {
             // Task (the root process) is still alive and overflowing memory.
             // Clean up.
             String msg =
@@ -251,6 +262,65 @@
     }
   }
 
+  /**
+   * Check whether a task's process tree's current memory usage is over limit.
+   * 
+   * When a java process exec's a program, it could momentarily account for
+   * double the size of it's memory, because the JVM does a fork()+exec()
+   * which at fork time creates a copy of the parent's memory. If the 
+   * monitoring thread detects the memory used by the task tree at the same
+   * instance, it could assume it is over limit and kill the tree, for no
+   * fault of the process itself.
+   * 
+   * We counter this problem by employing a heuristic check:
+   * - if a process tree exceeds the memory limit by more than twice, 
+   * it is killed immediately
+   * - if a process tree has processes older than the monitoring interval
+   * exceeding the memory limit by even 1 time, it is killed. Else it is given
+   * the benefit of doubt to lie around for one more iteration.
+   * 
+   * @param tId Task Id for the task tree
+   * @param currentMemUsage Memory usage of a task tree
+   * @param curMemUsageOfAgedProcesses Memory usage of processes older than
+   *                                    an iteration in a task tree
+   * @param limit The limit specified for the task
+   * @return true if the memory usage is more than twice the specified limit,
+   *              or if processes in the tree, older than this thread's 
+   *              monitoring interval, exceed the memory limit. False, 
+   *              otherwise.
+   */
+  boolean isProcessTreeOverLimit(String tId, 
+                                  long currentMemUsage, 
+                                  long curMemUsageOfAgedProcesses, 
+                                  long limit) {
+    boolean isOverLimit = false;
+    
+    if (currentMemUsage > (2*limit)) {
+      LOG.warn("Process tree for task: " + tId + " running over twice " +
+                "the configured limit. Limit=" + limit + 
+                ", current usage = " + currentMemUsage);
+      isOverLimit = true;
+    } else if (curMemUsageOfAgedProcesses > limit) {
+      LOG.warn("Process tree for task: " + tId + " has processes older than 1 " +
+          "iteration running over the configured limit. Limit=" + limit + 
+          ", current usage = " + curMemUsageOfAgedProcesses);
+      isOverLimit = true;
+    }
+
+    return isOverLimit; 
+  }
+
+  // method provided just for easy testing purposes
+  boolean isProcessTreeOverLimit(ProcfsBasedProcessTree pTree, 
+                                    String tId, long limit) {
+    long currentMemUsage = pTree.getCumulativeVmem();
+    // as processes begin with an age 1, we want to see if there are processes
+    // more than 1 iteration old.
+    long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
+    return isProcessTreeOverLimit(tId, currentMemUsage, 
+                                  curMemUsageOfAgedProcesses, limit);
+  }
+
   private void killTasksWithLeastProgress(long memoryStillInUsage) {
 
     List<TaskAttemptID> tasksToKill = new ArrayList<TaskAttemptID>();

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Tue Jun  9 16:11:19 2009
@@ -399,6 +399,25 @@
         ldLibraryPath.append(oldLdLibraryPath);
       }
       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+      
+      // add the env variables passed by the user
+      String mapredChildEnv = conf.get("mapred.child.env");
+      if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
+        String childEnvs[] = mapredChildEnv.split(",");
+        for (String cEnv : childEnvs) {
+          String[] parts = cEnv.split("="); // split on '='
+          String value = env.get(parts[0]);
+          if (value != null) {
+            // replace $env with the tt's value of env
+            value = parts[1].replaceAll("$" + parts[0], value);
+          } else {
+            // for cases where x=$x:/tmp is passed and x doesnt exist
+            value = parts[1].replaceAll("$" + parts[0], "");
+          }
+          env.put(parts[0], value);
+        }
+      }
+
       jvmManager.launchJvm(this, 
           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
               workDir, env, conf));

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Jun  9 16:11:19 2009
@@ -1783,7 +1783,7 @@
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
-      LOG.warn(msg);
+      LOG.warn(msg, e);
       tip.reportDiagnosticInfo(msg);
       try {
         tip.kill(true);

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Tue Jun  9 16:11:19 2009
@@ -19,28 +19,14 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.NetworkTopology;
 
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
  * An abstract {@link org.apache.hadoop.mapred.InputFormat} that returns {@link CombineFileSplit}'s
@@ -60,72 +46,13 @@
  * Subclasses implement {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)}
  * to construct <code>RecordReader</code>'s for <code>CombineFileSplit</code>'s.
  * @see CombineFileSplit
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat}
  */
+@Deprecated
 public abstract class CombineFileInputFormat<K, V>
-  extends FileInputFormat<K, V> {
-
-  // ability to limit the size of a single split
-  private long maxSplitSize = 0;
-  private long minSplitSizeNode = 0;
-  private long minSplitSizeRack = 0;
-
-  // A pool of input paths filters. A split cannot have blocks from files
-  // across multiple pools.
-  private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
-
-  // mapping from a rack name to the set of Nodes in the rack 
-  private static HashMap<String, Set<String>> rackToNodes = 
-                            new HashMap<String, Set<String>>();
-  /**
-   * Specify the maximum size (in bytes) of each split. Each split is
-   * approximately equal to the specified size.
-   */
-  protected void setMaxSplitSize(long maxSplitSize) {
-    this.maxSplitSize = maxSplitSize;
-  }
-
-  /**
-   * Specify the minimum size (in bytes) of each split per node.
-   * This applies to data that is left over after combining data on a single
-   * node into splits that are of maximum size specified by maxSplitSize.
-   * This leftover data will be combined into its own split if its size
-   * exceeds minSplitSizeNode.
-   */
-  protected void setMinSplitSizeNode(long minSplitSizeNode) {
-    this.minSplitSizeNode = minSplitSizeNode;
-  }
-
-  /**
-   * Specify the minimum size (in bytes) of each split per rack.
-   * This applies to data that is left over after combining data on a single
-   * rack into splits that are of maximum size specified by maxSplitSize.
-   * This leftover data will be combined into its own split if its size
-   * exceeds minSplitSizeRack.
-   */
-  protected void setMinSplitSizeRack(long minSplitSizeRack) {
-    this.minSplitSizeRack = minSplitSizeRack;
-  }
-
-  /**
-   * Create a new pool and add the filters to it.
-   * A split cannot have files from different pools.
-   */
-  protected void createPool(JobConf conf, List<PathFilter> filters) {
-    pools.add(new MultiPathFilter(filters));
-  }
-
-  /**
-   * Create a new pool and add the filters to it. 
-   * A pathname can satisfy any one of the specified filters.
-   * A split cannot have files from different pools.
-   */
-  protected void createPool(JobConf conf, PathFilter... filters) {
-    MultiPathFilter multi = new MultiPathFilter();
-    for (PathFilter f: filters) {
-      multi.add(f);
-    }
-    pools.add(multi);
-  }
+  extends org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat<K, V> 
+  implements InputFormat<K, V>{
 
   /**
    * default constructor
@@ -133,306 +60,9 @@
   public CombineFileInputFormat() {
   }
 
-  @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) 
     throws IOException {
-
-    long minSizeNode = 0;
-    long minSizeRack = 0;
-    long maxSize = 0;
-
-    // the values specified by setxxxSplitSize() takes precedence over the
-    // values that might have been specified in the config
-    if (minSplitSizeNode != 0) {
-      minSizeNode = minSplitSizeNode;
-    } else {
-      minSizeNode = job.getLong("mapred.min.split.size.per.node", 0);
-    }
-    if (minSplitSizeRack != 0) {
-      minSizeRack = minSplitSizeRack;
-    } else {
-      minSizeRack = job.getLong("mapred.min.split.size.per.rack", 0);
-    }
-    if (maxSplitSize != 0) {
-      maxSize = maxSplitSize;
-    } else {
-      maxSize = job.getLong("mapred.max.split.size", 0);
-    }
-    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
-      throw new IOException("Minimum split size pernode " + minSizeNode +
-                            " cannot be larger than maximum split size " +
-                            maxSize);
-    }
-    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
-      throw new IOException("Minimum split size per rack" + minSizeRack +
-                            " cannot be larger than maximum split size " +
-                            maxSize);
-    }
-    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
-      throw new IOException("Minimum split size per node" + minSizeNode +
-                            " cannot be smaller than minimum split size per rack " +
-                            minSizeRack);
-    }
-
-    // all the files in input set
-    Path[] paths = FileUtil.stat2Paths(listStatus(job));
-    List<CombineFileSplit> splits = new ArrayList<CombineFileSplit>();
-    if (paths.length == 0) {
-      return splits.toArray(new CombineFileSplit[splits.size()]);    
-    }
-
-    // In one single iteration, process all the paths in a single pool.
-    // Processing one pool at a time ensures that a split contans paths
-    // from a single pool only.
-    for (MultiPathFilter onepool : pools) {
-      ArrayList<Path> myPaths = new ArrayList<Path>();
-      
-      // pick one input path. If it matches all the filters in a pool,
-      // add it to the output set
-      for (int i = 0; i < paths.length; i++) {
-        if (paths[i] == null) {  // already processed
-          continue;
-        }
-        FileSystem fs = paths[i].getFileSystem(job);
-        Path p = new Path(paths[i].toUri().getPath());
-        if (onepool.accept(p)) {
-          myPaths.add(paths[i]); // add it to my output set
-          paths[i] = null;       // already processed
-        }
-      }
-      // create splits for all files in this pool.
-      getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
-                    maxSize, minSizeNode, minSizeRack, splits);
-    }
-
-    // Finally, process all paths that do not belong to any pool.
-    ArrayList<Path> myPaths = new ArrayList<Path>();
-    for (int i = 0; i < paths.length; i++) {
-      if (paths[i] == null) {  // already processed
-        continue;
-      }
-      myPaths.add(paths[i]);
-    }
-    // create splits for all files that are not in any pool.
-    getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
-                  maxSize, minSizeNode, minSizeRack, splits);
-
-    // free up rackToNodes map
-    rackToNodes.clear();
-    return splits.toArray(new CombineFileSplit[splits.size()]);    
-  }
-
-  /**
-   * Return all the splits in the specified set of paths
-   */
-  private void getMoreSplits(JobConf job, Path[] paths, 
-                             long maxSize, long minSizeNode, long minSizeRack,
-                             List<CombineFileSplit> splits)
-    throws IOException {
-
-    // all blocks for all the files in input set
-    OneFileInfo[] files;
-  
-    // mapping from a rack name to the list of blocks it has
-    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
-                              new HashMap<String, List<OneBlockInfo>>();
-
-    // mapping from a block to the nodes on which it has replicas
-    HashMap<OneBlockInfo, String[]> blockToNodes = 
-                              new HashMap<OneBlockInfo, String[]>();
-
-    // mapping from a node to the list of blocks that it contains
-    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
-                              new HashMap<String, List<OneBlockInfo>>();
-    
-    files = new OneFileInfo[paths.length];
-    if (paths.length == 0) {
-      return; 
-    }
-
-    // populate all the blocks for all files
-    long totLength = 0;
-    for (int i = 0; i < paths.length; i++) {
-      files[i] = new OneFileInfo(paths[i], job, 
-                                 rackToBlocks, blockToNodes, nodeToBlocks);
-      totLength += files[i].getLength();
-    }
-
-    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> nodes = new ArrayList<String>();
-    long curSplitSize = 0;
-
-    // process all nodes and create splits that are local
-    // to a node. 
-    for (Iterator<Map.Entry<String, 
-         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
-         iter.hasNext();) {
-
-      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-      nodes.add(one.getKey());
-      List<OneBlockInfo> blocksInNode = one.getValue();
-
-      // for each block, copy it into validBlocks. Delete it from 
-      // blockToNodes so that the same block does not appear in 
-      // two different splits.
-      for (OneBlockInfo oneblock : blocksInNode) {
-        if (blockToNodes.containsKey(oneblock)) {
-          validBlocks.add(oneblock);
-          blockToNodes.remove(oneblock);
-          curSplitSize += oneblock.length;
-
-          // if the accumulated split size exceeds the maximum, then 
-          // create this split.
-          if (maxSize != 0 && curSplitSize >= maxSize) {
-            // create an input split and add it to the splits array
-            addCreatedSplit(job, splits, nodes, validBlocks);
-            curSplitSize = 0;
-            validBlocks.clear();
-          }
-        }
-      }
-      // if there were any blocks left over and their combined size is
-      // larger than minSplitNode, then combine them into one split.
-      // Otherwise add them back to the unprocessed pool. It is likely 
-      // that they will be combined with other blocks from the same rack later on.
-      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(job, splits, nodes, validBlocks);
-      } else {
-        for (OneBlockInfo oneblock : validBlocks) {
-          blockToNodes.put(oneblock, oneblock.hosts);
-        }
-      }
-      validBlocks.clear();
-      nodes.clear();
-      curSplitSize = 0;
-    }
-
-    // if blocks in a rack are below the specified minimum size, then keep them
-    // in 'overflow'. After the processing of all racks is complete, these overflow
-    // blocks will be combined into splits.
-    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> racks = new ArrayList<String>();
-
-    // Process all racks over and over again until there is no more work to do.
-    while (blockToNodes.size() > 0) {
-
-      // Create one split for this rack before moving over to the next rack. 
-      // Come back to this rack after creating a single split for each of the 
-      // remaining racks.
-      // Process one rack location at a time, Combine all possible blocks that
-      // reside on this rack as one split. (constrained by minimum and maximum
-      // split size).
-
-      // iterate over all racks 
-      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
-           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
-
-        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-        racks.add(one.getKey());
-        List<OneBlockInfo> blocks = one.getValue();
-
-        // for each block, copy it into validBlocks. Delete it from 
-        // blockToNodes so that the same block does not appear in 
-        // two different splits.
-        boolean createdSplit = false;
-        for (OneBlockInfo oneblock : blocks) {
-          if (blockToNodes.containsKey(oneblock)) {
-            validBlocks.add(oneblock);
-            blockToNodes.remove(oneblock);
-            curSplitSize += oneblock.length;
-      
-            // if the accumulated split size exceeds the maximum, then 
-            // create this split.
-            if (maxSize != 0 && curSplitSize >= maxSize) {
-              // create an input split and add it to the splits array
-              addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-              createdSplit = true;
-              break;
-            }
-          }
-        }
-
-        // if we created a split, then just go to the next rack
-        if (createdSplit) {
-          curSplitSize = 0;
-          validBlocks.clear();
-          racks.clear();
-          continue;
-        }
-
-        if (!validBlocks.isEmpty()) {
-          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
-            // if there is a mimimum size specified, then create a single split
-            // otherwise, store these blocks into overflow data structure
-            addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-          } else {
-            // There were a few blocks in this rack that remained to be processed.
-            // Keep them in 'overflow' block list. These will be combined later.
-            overflowBlocks.addAll(validBlocks);
-          }
-        }
-        curSplitSize = 0;
-        validBlocks.clear();
-        racks.clear();
-      }
-    }
-
-    assert blockToNodes.isEmpty();
-    assert curSplitSize == 0;
-    assert validBlocks.isEmpty();
-    assert racks.isEmpty();
-
-    // Process all overflow blocks
-    for (OneBlockInfo oneblock : overflowBlocks) {
-      validBlocks.add(oneblock);
-      curSplitSize += oneblock.length;
-
-      // This might cause an exiting rack location to be re-added,
-      // but it should be ok.
-      for (int i = 0; i < oneblock.racks.length; i++) {
-        racks.add(oneblock.racks[i]);
-      }
-
-      // if the accumulated split size exceeds the maximum, then 
-      // create this split.
-      if (maxSize != 0 && curSplitSize >= maxSize) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-        curSplitSize = 0;
-        validBlocks.clear();
-        racks.clear();
-      }
-    }
-
-    // Process any remaining blocks, if any.
-    if (!validBlocks.isEmpty()) {
-      addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-    }
-  }
-
-  /**
-   * Create a single split from the list of blocks specified in validBlocks
-   * Add this new split into splitList.
-   */
-  private void addCreatedSplit(JobConf job,
-                               List<CombineFileSplit> splitList, 
-                               List<String> locations, 
-                               ArrayList<OneBlockInfo> validBlocks) {
-    // create an input split
-    Path[] fl = new Path[validBlocks.size()];
-    long[] offset = new long[validBlocks.size()];
-    long[] length = new long[validBlocks.size()];
-    for (int i = 0; i < validBlocks.size(); i++) {
-      fl[i] = validBlocks.get(i).onepath; 
-      offset[i] = validBlocks.get(i).offset;
-      length[i] = validBlocks.get(i).length;
-    }
-
-     // add this split to the list that is returned
-    CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, 
-                                   length, locations.toArray(new String[0]));
-    splitList.add(thissplit); 
+    return super.getSplits(new Job(job)).toArray(new InputSplit[0]);
   }
 
   /**
@@ -442,171 +72,11 @@
                                       JobConf job, Reporter reporter)
     throws IOException;
 
-  /**
-   * information about one file from the File System
-   */
-  private static class OneFileInfo {
-    private long fileSize;               // size of the file
-    private OneBlockInfo[] blocks;       // all blocks in this file
-
-    OneFileInfo(Path path, JobConf job,
-                HashMap<String, List<OneBlockInfo>> rackToBlocks,
-                HashMap<OneBlockInfo, String[]> blockToNodes,
-                HashMap<String, List<OneBlockInfo>> nodeToBlocks)
-                throws IOException {
-      this.fileSize = 0;
-
-      // get block locations from file system
-      FileSystem fs = path.getFileSystem(job);
-      FileStatus stat = fs.getFileStatus(path);
-      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
-                                                           stat.getLen());
-      // create a list of all block and their locations
-      if (locations == null) {
-        blocks = new OneBlockInfo[0];
-      } else {
-        blocks = new OneBlockInfo[locations.length];
-        for (int i = 0; i < locations.length; i++) {
-           
-          fileSize += locations[i].getLength();
-          OneBlockInfo oneblock =  new OneBlockInfo(path, 
-                                       locations[i].getOffset(), 
-                                       locations[i].getLength(),
-                                       locations[i].getHosts(),
-                                       locations[i].getTopologyPaths()); 
-          blocks[i] = oneblock;
-
-          // add this block to the block --> node locations map
-          blockToNodes.put(oneblock, oneblock.hosts);
-
-          // add this block to the rack --> block map
-          for (int j = 0; j < oneblock.racks.length; j++) {
-            String rack = oneblock.racks[j];
-            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              rackToBlocks.put(rack, blklist);
-            }
-            blklist.add(oneblock);
-            // Add this host to rackToNodes map
-            addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
-         }
-
-          // add this block to the node --> block map
-          for (int j = 0; j < oneblock.hosts.length; j++) {
-            String node = oneblock.hosts[j];
-            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              nodeToBlocks.put(node, blklist);
-            }
-            blklist.add(oneblock);
-          }
-        }
-      }
-    }
-
-    long getLength() {
-      return fileSize;
-    }
-
-    OneBlockInfo[] getBlocks() {
-      return blocks;
-    }
-  }
-
-  /**
-   * information about one block from the File System
-   */
-  private static class OneBlockInfo {
-    Path onepath;                // name of this file
-    long offset;                 // offset in file
-    long length;                 // length of this block
-    String[] hosts;              // nodes on whch this block resides
-    String[] racks;              // network topology of hosts
-
-    OneBlockInfo(Path path, long offset, long len, 
-                 String[] hosts, String[] topologyPaths) {
-      this.onepath = path;
-      this.offset = offset;
-      this.hosts = hosts;
-      this.length = len;
-      assert (hosts.length == topologyPaths.length ||
-              topologyPaths.length == 0);
-
-      // if the file ystem does not have any rack information, then
-      // use dummy rack location.
-      if (topologyPaths.length == 0) {
-        topologyPaths = new String[hosts.length];
-        for (int i = 0; i < topologyPaths.length; i++) {
-          topologyPaths[i] = (new NodeBase(hosts[i], NetworkTopology.DEFAULT_RACK)).
-                                          toString();
-        }
-      }
-
-      // The topology paths have the host name included as the last 
-      // component. Strip it.
-      this.racks = new String[topologyPaths.length];
-      for (int i = 0; i < topologyPaths.length; i++) {
-        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
-      }
-    }
+  // abstract method from super class implemented to return null
+  public org.apache.hadoop.mapreduce.RecordReader<K, V> createRecordReader(
+      org.apache.hadoop.mapreduce.InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    return null;
   }
 
-  private static void addHostToRack(String rack, String host) {
-    Set<String> hosts = rackToNodes.get(rack);
-    if (hosts == null) {
-      hosts = new HashSet<String>();
-      rackToNodes.put(rack, hosts);
-    }
-    hosts.add(host);
-  }
-  
-  private static List<String> getHosts(List<String> racks) {
-    List<String> hosts = new ArrayList<String>();
-    for (String rack : racks) {
-      hosts.addAll(rackToNodes.get(rack));
-    }
-    return hosts;
-  }
-  
-  /**
-   * Accept a path only if any one of filters given in the
-   * constructor do. 
-   */
-  private static class MultiPathFilter implements PathFilter {
-    private List<PathFilter> filters;
-
-    public MultiPathFilter() {
-      this.filters = new ArrayList<PathFilter>();
-    }
-
-    public MultiPathFilter(List<PathFilter> filters) {
-      this.filters = filters;
-    }
-
-    public void add(PathFilter one) {
-      filters.add(one);
-    }
-
-    public boolean accept(Path path) {
-      for (PathFilter filter : filters) {
-        if (filter.accept(path)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    public String toString() {
-      StringBuffer buf = new StringBuffer();
-      buf.append("[");
-      for (PathFilter f: filters) {
-        buf.append(f);
-        buf.append(",");
-      }
-      buf.append("]");
-      return buf.toString();
-    }
-  }
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java Tue Jun  9 16:11:19 2009
@@ -19,11 +19,9 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.*;
-import java.util.*;
 import java.lang.reflect.*;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.conf.Configuration;
@@ -35,8 +33,10 @@
  * This class allows using different RecordReaders for processing
  * these data chunks from different files.
  * @see CombineFileSplit
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader}
  */
-
+@Deprecated
 public class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
 
   static final Class [] constructorSignature = new Class [] 

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java Tue Jun  9 16:11:19 2009
@@ -18,188 +18,30 @@
 
 package org.apache.hadoop.mapred.lib;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.io.Text;
-
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 
 /**
- * A sub-collection of input files. Unlike {@link org.apache.hadoop.mapred.FileSplit}, 
- * CombineFileSplit * class does not represent a split of a file, but a split of input files 
- * into smaller sets. A split may contain blocks from different file but all 
- * the blocks in the same split are probably local to some rack <br> 
- * CombineFileSplit can be used to implement {@link org.apache.hadoop.mapred.RecordReader}'s, 
- * with reading one record per file.
- * @see org.apache.hadoop.mapred.FileSplit
- * @see CombineFileInputFormat 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.CombineFileSplit}
  */
-public class CombineFileSplit implements InputSplit {
+@Deprecated
+public class CombineFileSplit extends 
+    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit 
+    implements InputSplit {
 
-  private Path[] paths;
-  private long[] startoffset;
-  private long[] lengths;
-  private String[] locations;
-  private long totLength;
   private JobConf job;
 
-  /**
-   * default constructor
-   */
-  public CombineFileSplit() {}
-  public CombineFileSplit(JobConf job, Path[] files, long[] start, 
-                          long[] lengths, String[] locations) {
-    initSplit(job, files, start, lengths, locations);
+  public CombineFileSplit() {
   }
 
   public CombineFileSplit(JobConf job, Path[] files, long[] lengths) {
-    long[] startoffset = new long[files.length];
-    for (int i = 0; i < startoffset.length; i++) {
-      startoffset[i] = 0;
-    }
-    String[] locations = new String[files.length];
-    for (int i = 0; i < locations.length; i++) {
-      locations[i] = "";
-    }
-    initSplit(job, files, startoffset, lengths, locations);
-  }
-  
-  private void initSplit(JobConf job, Path[] files, long[] start, 
-                         long[] lengths, String[] locations) {
+    super(files, lengths);
     this.job = job;
-    this.startoffset = start;
-    this.lengths = lengths;
-    this.paths = files;
-    this.totLength = 0;
-    this.locations = locations;
-    for(long length : lengths) {
-      totLength += length;
-    }
-  }
-
-  /**
-   * Copy constructor
-   */
-  public CombineFileSplit(CombineFileSplit old) throws IOException {
-    this(old.getJob(), old.getPaths(), old.getStartOffsets(),
-         old.getLengths(), old.getLocations());
   }
 
   public JobConf getJob() {
     return job;
   }
-
-  public long getLength() {
-    return totLength;
-  }
-
-  /** Returns an array containing the startoffsets of the files in the split*/ 
-  public long[] getStartOffsets() {
-    return startoffset;
-  }
-  
-  /** Returns an array containing the lengths of the files in the split*/ 
-  public long[] getLengths() {
-    return lengths;
-  }
-
-  /** Returns the start offset of the i<sup>th</sup> Path */
-  public long getOffset(int i) {
-    return startoffset[i];
-  }
-  
-  /** Returns the length of the i<sup>th</sup> Path */
-  public long getLength(int i) {
-    return lengths[i];
-  }
-  
-  /** Returns the number of Paths in the split */
-  public int getNumPaths() {
-    return paths.length;
-  }
-
-  /** Returns the i<sup>th</sup> Path */
-  public Path getPath(int i) {
-    return paths[i];
-  }
-  
-  /** Returns all the Paths in the split */
-  public Path[] getPaths() {
-    return paths;
-  }
-
-  /** Returns all the Paths where this input-split resides */
-  public String[] getLocations() throws IOException {
-    return locations;
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    totLength = in.readLong();
-    int arrLength = in.readInt();
-    lengths = new long[arrLength];
-    for(int i=0; i<arrLength;i++) {
-      lengths[i] = in.readLong();
-    }
-    int filesLength = in.readInt();
-    paths = new Path[filesLength];
-    for(int i=0; i<filesLength;i++) {
-      paths[i] = new Path(Text.readString(in));
-    }
-    arrLength = in.readInt();
-    startoffset = new long[arrLength];
-    for(int i=0; i<arrLength;i++) {
-      startoffset[i] = in.readLong();
-    }
-  }
-
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(totLength);
-    out.writeInt(lengths.length);
-    for(long length : lengths) {
-      out.writeLong(length);
-    }
-    out.writeInt(paths.length);
-    for(Path p : paths) {
-      Text.writeString(out, p.toString());
-    }
-    out.writeInt(startoffset.length);
-    for(long length : startoffset) {
-      out.writeLong(length);
-    }
-  }
-  
-  @Override
- public String toString() {
-    StringBuffer sb = new StringBuffer();
-    for (int i = 0; i < paths.length; i++) {
-      if (i == 0 ) {
-        sb.append("Paths:");
-      }
-      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
-                "+" + lengths[i]);
-      if (i < paths.length -1) {
-        sb.append(",");
-      }
-    }
-    if (locations != null) {
-      String locs = "";
-      StringBuffer locsb = new StringBuffer();
-      for (int i = 0; i < locations.length; i++) {
-        locsb.append(locations[i] + ":");
-      }
-      locs = locsb.toString();
-      sb.append(" Locations:" + locs + "; ");
-    }
-    return sb.toString();
-  }
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/bin/test-patch.sh
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/bin/test-patch.sh?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/bin/test-patch.sh (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/bin/test-patch.sh Tue Jun  9 16:11:19 2009
@@ -362,7 +362,7 @@
         echo "Lines that start with ????? in the release audit report indicate files that do not have an Apache license header." > $PATCH_DIR/releaseAuditDiffWarnings.txt
         echo "" > $PATCH_DIR/releaseAuditDiffWarnings.txt
         diff $PATCH_DIR/patchReleaseAuditProblems.txt $PATCH_DIR/trunkReleaseAuditProblems.txt >> $PATCH_DIR/releaseAuditDiffWarnings.txt
-        JIRA_COMMENT_FOOTER="Release audit warnings: http://hudson.zones.apache.org/hudson/job/$JOB_NAME/$BUILD_NUMBER/artifact/trunk/current/releaseAuditDiffWarnings.txt
+        JIRA_COMMENT_FOOTER="Release audit warnings: http://hudson.zones.apache.org/hudson/job/$JOB_NAME/$BUILD_NUMBER/artifact/trunk/patchprocess/releaseAuditDiffWarnings.txt
 $JIRA_COMMENT_FOOTER"
         return 1
       fi

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java Tue Jun  9 16:11:19 2009
@@ -23,6 +23,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
 
 public abstract class S3FileSystemContractBaseTest
   extends FileSystemContractBaseTest {
@@ -45,4 +46,15 @@
     super.tearDown();
   }
   
+  public void testBlockSize() throws Exception {
+    
+    long newBlockSize = fs.getDefaultBlockSize() * 2;
+    fs.getConf().setLong("fs.s3.block.size", newBlockSize);
+    
+    Path file = path("/test/hadoop/file");
+    createFile(file);
+    assertEquals("Double default block size", newBlockSize,
+	fs.getFileStatus(file).getBlockSize());
+  }
+  
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java Tue Jun  9 16:11:19 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3native;
 
 import static org.apache.hadoop.fs.s3native.NativeS3FileSystem.PATH_DELIMITER;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
@@ -122,19 +123,13 @@
 
   public PartialListing list(String prefix, int maxListingLength)
       throws IOException {
-    return list(prefix, maxListingLength, null);
+    return list(prefix, maxListingLength, null, false);
   }
 
   public PartialListing list(String prefix, int maxListingLength,
-      String priorLastKey) throws IOException {
-
-    return list(prefix, PATH_DELIMITER, maxListingLength, priorLastKey);
-  }
-
-  public PartialListing listAll(String prefix, int maxListingLength,
-      String priorLastKey) throws IOException {
+      String priorLastKey, boolean recursive) throws IOException {
 
-    return list(prefix, null, maxListingLength, priorLastKey);
+    return list(prefix, recursive ? null : PATH_DELIMITER, maxListingLength, priorLastKey);
   }
 
   private PartialListing list(String prefix, String delimiter,
@@ -174,9 +169,9 @@
     dataMap.remove(key);
   }
 
-  public void rename(String srcKey, String dstKey) throws IOException {
-    metadataMap.put(dstKey, metadataMap.remove(srcKey));
-    dataMap.put(dstKey, dataMap.remove(srcKey));
+  public void copy(String srcKey, String dstKey) throws IOException {
+    metadataMap.put(dstKey, metadataMap.get(srcKey));
+    dataMap.put(dstKey, dataMap.get(srcKey));
   }
   
   public void purge(String prefix) throws IOException {

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java Tue Jun  9 16:11:19 2009
@@ -56,4 +56,94 @@
     assertEquals(path("/test"), paths[0].getPath());
   }
   
+  public void testNoTrailingBackslashOnBucket() throws Exception {
+    assertTrue(fs.getFileStatus(new Path(fs.getUri().toString())).isDir());
+  }
+
+  private void createTestFiles(String base) throws IOException {
+    store.storeEmptyFile(base + "/file1");
+    store.storeEmptyFile(base + "/dir/file2");
+    store.storeEmptyFile(base + "/dir/file3");
+  }
+
+  public void testDirWithDifferentMarkersWorks() throws Exception {
+
+    for (int i = 0; i < 3; i++) {
+      String base = "test/hadoop" + i;
+      Path path = path("/" + base);
+
+      createTestFiles(base);
+
+      if (i == 0 ) {
+        //do nothing, we are testing correctness with no markers
+      }
+      else if (i == 1) {
+        // test for _$folder$ marker
+        store.storeEmptyFile(base + "_$folder$");
+        store.storeEmptyFile(base + "/dir_$folder$");
+      }
+      else if (i == 2) {
+        // test the end slash file marker
+        store.storeEmptyFile(base + "/");
+        store.storeEmptyFile(base + "/dir/");
+      }
+      else if (i == 3) {
+        // test both markers
+        store.storeEmptyFile(base + "_$folder$");
+        store.storeEmptyFile(base + "/dir_$folder$");
+        store.storeEmptyFile(base + "/");
+        store.storeEmptyFile(base + "/dir/");
+      }
+
+      assertTrue(fs.getFileStatus(path).isDir());
+      assertEquals(2, fs.listStatus(path).length);
+    }
+  }
+
+  public void testDeleteWithNoMarker() throws Exception {
+    String base = "test/hadoop";
+    Path path = path("/" + base);
+
+    createTestFiles(base);
+
+    fs.delete(path, true);
+
+    path = path("/test");
+    assertTrue(fs.getFileStatus(path).isDir());
+    assertEquals(0, fs.listStatus(path).length);
+  }
+
+  public void testRenameWithNoMarker() throws Exception {
+    String base = "test/hadoop";
+    Path dest = path("/test/hadoop2");
+
+    createTestFiles(base);
+
+    fs.rename(path("/" + base), dest);
+
+    Path path = path("/test");
+    assertTrue(fs.getFileStatus(path).isDir());
+    assertEquals(1, fs.listStatus(path).length);
+    assertTrue(fs.getFileStatus(dest).isDir());
+    assertEquals(2, fs.listStatus(dest).length);
+  }
+
+  public void testEmptyFile() throws Exception {
+    store.storeEmptyFile("test/hadoop/file1");
+    fs.open(path("/test/hadoop/file1")).close();
+  }
+  
+  public void testBlockSize() throws Exception {
+    Path file = path("/test/hadoop/file");
+    createFile(file);
+    assertEquals("Default block size", fs.getDefaultBlockSize(),
+    fs.getFileStatus(file).getBlockSize());
+
+    // Block size is determined at read time
+    long newBlockSize = fs.getDefaultBlockSize() * 2;
+    fs.getConf().setLong("fs.s3n.block.size", newBlockSize);
+    assertEquals("Double default block size", newBlockSize,
+    fs.getFileStatus(file).getBlockSize());
+  }
+
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Tue Jun  9 16:11:19 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.util;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
@@ -29,6 +30,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -231,4 +233,236 @@
     }
     return pid;
   }
+  
+  public static class ProcessStatInfo {
+    // sample stat in a single line : 3910 (gpm) S 1 3910 3910 0 -1 4194624 
+    // 83 0 0 0 0 0 0 0 16 0 1 0 7852 2408448 88 4294967295 134512640 
+    // 134590050 3220521392 3220520036 10975138 0 0 4096 134234626 
+    // 4294967295 0 0 17 1 0 0
+    String pid;
+    String name;
+    String ppid;
+    String pgrpId;
+    String session;
+    String vmem;
+    
+    public ProcessStatInfo(String[] statEntries) {
+      pid = statEntries[0];
+      name = statEntries[1];
+      ppid = statEntries[2];
+      pgrpId = statEntries[3];
+      session = statEntries[4];
+      vmem = statEntries[5];
+    }
+    
+    // construct a line that mimics the procfs stat file.
+    // all unused numerical entries are set to 0.
+    public String getStatLine() {
+      return String.format("%s (%s) S %s %s %s 0 0 0" +
+                      " 0 0 0 0 0 0 0 0 0 0 0 0 0 %s 0 0 0" +
+                      " 0 0 0 0 0 0 0 0" +
+                      " 0 0 0 0 0", 
+                      pid, name, ppid, pgrpId, session, vmem);
+    }
+  }
+  
+  /**
+   * A basic test that creates a few process directories and writes
+   * stat files. Verifies that the virtual memory is correctly  
+   * computed.
+   * @throws IOException if there was a problem setting up the
+   *                      fake procfs directories or files.
+   */
+  public void testVirtualMemoryForProcessTree() throws IOException {
+
+    // test processes
+    String[] pids = { "100", "200", "300", "400" };
+    // create the fake procfs root directory. 
+    File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+    try {
+      setupProcfsRootDir(procfsRootDir);
+      setupPidDirs(procfsRootDir, pids);
+      
+      // create stat objects.
+      // assuming processes 100, 200, 300 are in tree and 400 is not.
+      ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
+      procInfos[0] = new ProcessStatInfo(new String[] 
+                                  {"100", "proc1", "1", "100", "100", "100000"});
+      procInfos[1] = new ProcessStatInfo(new String[] 
+                                  {"200", "proc2", "100", "100", "100", "200000"});
+      procInfos[2] = new ProcessStatInfo(new String[] 
+                                  {"300", "proc3", "200", "100", "100", "300000"});
+      procInfos[3] = new ProcessStatInfo(new String[] 
+                                  {"400", "proc4", "1", "400", "400", "400000"});
+      
+      writeStatFiles(procfsRootDir, pids, procInfos);
+      
+      // crank up the process tree class.
+      ProcfsBasedProcessTree processTree = 
+          new ProcfsBasedProcessTree("100", true, 100L, 
+                                  procfsRootDir.getAbsolutePath());
+      // build the process tree.
+      processTree.getProcessTree();
+      
+      // verify cumulative memory
+      assertEquals("Cumulative memory does not match", 
+              Long.parseLong("600000"), processTree.getCumulativeVmem());
+    } finally {
+      FileUtil.fullyDelete(procfsRootDir);
+    }
+  }
+  
+  /**
+   * Tests that cumulative memory is computed only for
+   * processes older than a given age.
+   * @throws IOException if there was a problem setting up the
+   *                      fake procfs directories or files.
+   */
+  public void testVMemForOlderProcesses() throws IOException {
+    // initial list of processes
+    String[] pids = { "100", "200", "300", "400" };
+    // create the fake procfs root directory. 
+    File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+    try {
+      setupProcfsRootDir(procfsRootDir);
+      setupPidDirs(procfsRootDir, pids);
+      
+      // create stat objects.
+      // assuming 100, 200 and 400 are in tree, 300 is not.
+      ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
+      procInfos[0] = new ProcessStatInfo(new String[] 
+                                  {"100", "proc1", "1", "100", "100", "100000"});
+      procInfos[1] = new ProcessStatInfo(new String[] 
+                                  {"200", "proc2", "100", "100", "100", "200000"});
+      procInfos[2] = new ProcessStatInfo(new String[] 
+                                  {"300", "proc3", "1", "300", "300", "300000"});
+      procInfos[3] = new ProcessStatInfo(new String[] 
+                                  {"400", "proc4", "100", "100", "100", "400000"});
+      
+      writeStatFiles(procfsRootDir, pids, procInfos);
+      
+      // crank up the process tree class.
+      ProcfsBasedProcessTree processTree = 
+          new ProcfsBasedProcessTree("100", true, 100L, 
+                                  procfsRootDir.getAbsolutePath());
+      // build the process tree.
+      processTree.getProcessTree();
+      
+      // verify cumulative memory
+      assertEquals("Cumulative memory does not match", 
+              Long.parseLong("700000"), processTree.getCumulativeVmem());
+      
+      // write one more process as child of 100.
+      String[] newPids = { "500" };
+      setupPidDirs(procfsRootDir, newPids);
+      
+      ProcessStatInfo[] newProcInfos = new ProcessStatInfo[1];
+      newProcInfos[0] = new ProcessStatInfo(new String[]
+                             {"500", "proc5", "100", "100", "100", "500000"});
+      writeStatFiles(procfsRootDir, newPids, newProcInfos);
+      
+      // check vmem includes the new process.
+      processTree.getProcessTree();
+      assertEquals("Cumulative memory does not include new process",
+              Long.parseLong("1200000"), processTree.getCumulativeVmem());
+      
+      // however processes older than 1 iteration will retain the older value
+      assertEquals("Cumulative memory shouldn't have included new process",
+              Long.parseLong("700000"), processTree.getCumulativeVmem(1));
+      
+      // one more process
+      newPids = new String[]{ "600" };
+      setupPidDirs(procfsRootDir, newPids);
+      
+      newProcInfos = new ProcessStatInfo[1];
+      newProcInfos[0] = new ProcessStatInfo(new String[]
+                                     {"600", "proc6", "100", "100", "100", "600000"});
+      writeStatFiles(procfsRootDir, newPids, newProcInfos);
+
+      // refresh process tree
+      processTree.getProcessTree();
+      
+      // processes older than 2 iterations should be same as before.
+      assertEquals("Cumulative memory shouldn't have included new processes",
+          Long.parseLong("700000"), processTree.getCumulativeVmem(2));
+      
+      // processes older than 1 iteration should not include new process,
+      // but include process 500
+      assertEquals("Cumulative memory shouldn't have included new processes",
+          Long.parseLong("1200000"), processTree.getCumulativeVmem(1));
+      
+      // no processes older than 3 iterations, this should be 0
+      assertEquals("Getting non-zero vmem for processes older than 3 iterations",
+                    0L, processTree.getCumulativeVmem(3));
+    } finally {
+      FileUtil.fullyDelete(procfsRootDir);
+    }
+  }
+
+  /**
+   * Create a directory to mimic the procfs file system's root.
+   * @param procfsRootDir root directory to create.
+   * @throws IOException if could not delete the procfs root directory
+   */
+  public static void setupProcfsRootDir(File procfsRootDir) 
+                                        throws IOException { 
+    // cleanup any existing process root dir.
+    if (procfsRootDir.exists()) {
+      assertTrue(FileUtil.fullyDelete(procfsRootDir));  
+    }
+
+    // create afresh
+    assertTrue(procfsRootDir.mkdirs());
+  }
+
+  /**
+   * Create PID directories under the specified procfs root directory
+   * @param procfsRootDir root directory of procfs file system
+   * @param pids the PID directories to create.
+   * @throws IOException If PID dirs could not be created
+   */
+  public static void setupPidDirs(File procfsRootDir, String[] pids) 
+                      throws IOException {
+    for (String pid : pids) {
+      File pidDir = new File(procfsRootDir, pid);
+      pidDir.mkdir();
+      if (!pidDir.exists()) {
+        throw new IOException ("couldn't make process directory under " +
+            "fake procfs");
+      } else {
+        LOG.info("created pid dir");
+      }
+    }
+  }
+  
+  /**
+   * Write stat files under the specified pid directories with data
+   * setup in the corresponding ProcessStatInfo objects
+   * @param procfsRootDir root directory of procfs file system
+   * @param pids the PID directories under which to create the stat file
+   * @param procs corresponding ProcessStatInfo objects whose data should be
+   *              written to the stat files.
+   * @throws IOException if stat files could not be written
+   */
+  public static void writeStatFiles(File procfsRootDir, String[] pids, 
+                              ProcessStatInfo[] procs) throws IOException {
+    for (int i=0; i<pids.length; i++) {
+      File statFile = new File(new File(procfsRootDir, pids[i]), "stat");
+      BufferedWriter bw = null;
+      try {
+        FileWriter fw = new FileWriter(statFile);
+        bw = new BufferedWriter(fw);
+        bw.write(procs[i].getStatLine());
+        LOG.info("wrote stat file for " + pids[i] + 
+                  " with contents: " + procs[i].getStatLine());
+      } finally {
+        // not handling exception - will throw an error and fail the test.
+        if (bw != null) {
+          bw.close();
+        }
+      }
+    }
+  }
 }



Mime
View raw message