hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From psdc1978 <psdc1...@gmail.com>
Subject FetchOutputs method understanding.
Date Thu, 27 May 2010 20:29:28 GMT
Hi,

I'm looking to the method fetchOutputs from the ReduceTask.java, but there's
a part of the method that I don't understand. Inside the method, in the
synchronized (scheduledCopies) {...}, I don't understand what's happening
inside the curly brackets? What's the purpose of this part of code?

Here's the method:

[code]

    public boolean fetchOutputs() throws IOException {
     (...)

        // loop until we get all required outputs
        while (copiedMapOutputs.size() < numMaps && mergeThrowable == null)
{

          (...)
          // Put the hash entries for the failed fetches.
          Iterator<MapOutputLocation> locItr = retryFetches.iterator();

          while (locItr.hasNext()) {
            MapOutputLocation loc = locItr.next();
            List<MapOutputLocation> locList =
mapLocations.get(loc.getHost());

            // Check if the list exists. Map output location mapping is
cleared
            // once the jobtracker restarts and is rebuilt from scratch.
            // Note that map-output-location mapping will be recreated and
hence
            // we continue with the hope that we might find some locations
            // from the rebuild map.
            if (locList != null) {
              // Add to the beginning of the list so that this map is
              //tried again before the others and we can hasten the
              //re-execution of this map should there be a problem
              locList.add(0, loc);
            }
          }

          if (retryFetches.size() > 0) {
            LOG.info(reduceTask.getTaskID() + ": " +
                  "Got " + retryFetches.size() +
                  " map-outputs from previous failures");
          }
          // clear the "failed" fetches hashmap
          retryFetches.clear();

          // now walk through the cache and schedule what we can
          int numScheduled = 0;
          int numDups = 0;


          #### HERE ########
          Q1: I don't understand what is going on inside this syncronized?
What's the purpose of this code?
          synchronized (scheduledCopies) {

            // Randomize the map output locations to prevent
            // all reduce-tasks swamping the same tasktracker
            List<String> hostList = new ArrayList<String>();
            hostList.addAll(mapLocations.keySet());

            Collections.shuffle(hostList, this.random);

            Iterator<String> hostsItr = hostList.iterator();

            while (hostsItr.hasNext()) {

              String host = hostsItr.next();

              List<MapOutputLocation> knownOutputsByLoc =
                mapLocations.get(host);

              // Check if the list exists. Map output location mapping is
              // cleared once the jobtracker restarts and is rebuilt from
              // scratch.
              // Note that map-output-location mapping will be recreated and

              // hence we continue with the hope that we might find some
              // locations from the rebuild map and add then for fetching.
              if (knownOutputsByLoc == null || knownOutputsByLoc.size() ==
0) {
                continue;
              }

              //Identify duplicate hosts here
              if (uniqueHosts.contains(host)) {
                 numDups += knownOutputsByLoc.size();
                 continue;
              }

              Long penaltyEnd = penaltyBox.get(host);
              boolean penalized = false;

              if (penaltyEnd != null) {
                if (currentTime < penaltyEnd.longValue()) {
                  penalized = true;
                } else {
                  penaltyBox.remove(host);
                }
              }

              if (penalized)
                continue;

              synchronized (knownOutputsByLoc) {

                locItr = knownOutputsByLoc.iterator();

                while (locItr.hasNext()) {

                  MapOutputLocation loc = locItr.next();

                  // Do not schedule fetches from OBSOLETE maps
                  if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
                    locItr.remove();
                    continue;
                  }

                  uniqueHosts.add(host);
                  scheduledCopies.add(loc);
                  locItr.remove();  // remove from knownOutputs
                  numInFlight++; numScheduled++;

                  break; //we have a map from this host
                }
              }
            }
            scheduledCopies.notifyAll();
          }

          if (numScheduled > 0 || logNow) {
            LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
                   " outputs (" + penaltyBox.size() +
                   " slow hosts and" + numDups + " dup hosts)");
          }

          if (penaltyBox.size() > 0 && logNow) {
            LOG.info("Penalized(slow) Hosts: ");
            for (String host : penaltyBox.keySet()) {
              LOG.info(host + " Will be considered after: " +
                  ((penaltyBox.get(host) - currentTime)/1000) + "
seconds.");
            }
          }

          // if we have no copies in flight and we can't schedule anything
          // new, just wait for a bit
          try {
            if (numInFlight == 0 && numScheduled == 0) {
              // we should indicate progress as we don't want TT to think
              // we're stuck and kill us
              reporter.progress();
              Thread.sleep(5000);
            }
          } catch (InterruptedException e) { } // IGNORE

          while (numInFlight > 0 && mergeThrowable == null) {
            LOG.debug(reduceTask.getTaskID() + " numInFlight = " +
                      numInFlight);
            //the call to getCopyResult will either
            //1) return immediately with a null or a valid CopyResult
object,
            //                 or
            //2) if the numInFlight is above maxInFlight, return with a
            //   CopyResult object after getting a notification from a
            //   fetcher thread,
            //So, when getCopyResult returns null, we can be sure that
            //we aren't busy enough and we should go and get more
mapcompletion
            //events from the tasktracker
            CopyResult cr = getCopyResult(numInFlight);

            if (cr == null) {
              break;
            }

            if (cr.getSuccess()) {  // a successful copy
              numCopied++;
              lastProgressTime = System.currentTimeMillis();
              reduceShuffleBytes.increment(cr.getSize());

              long secsSinceStart =
                (System.currentTimeMillis()-startTime)/1000+1;
              float mbs =
((float)reduceShuffleBytes.getCounter())/(1024*1024);
              float transferRate = mbs/secsSinceStart;

              copyPhase.startNextPhase();
              copyPhase.setStatus("copy (" + numCopied + " of " + numMaps
                                  + " at " +
                                  mbpsFormat.format(transferRate) +  "
MB/s)");

              // Note successful fetch for this mapId to invalidate
              // (possibly) old fetch-failures
              fetchFailedMaps.remove(cr.getLocation().getTaskId());
            } else if (cr.isObsolete()) {
              //ignore
              LOG.info(reduceTask.getTaskID() +
                       " Ignoring obsolete copy result for Map Task: " +
                       cr.getLocation().getTaskAttemptId() + " from host: "
+
                       cr.getHost());
            } else {
              retryFetches.add(cr.getLocation());

              // note the failed-fetch
              TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
              TaskID mapId = cr.getLocation().getTaskId();

              totalFailures++;
              Integer noFailedFetches =
                mapTaskToFailedFetchesMap.get(mapTaskId);
              noFailedFetches =
                (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
              mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
              LOG.info("Task " + getTaskID() + ": Failed fetch #" +
                       noFailedFetches + " from " + mapTaskId);

              // did the fetch fail too many times?
              // using a hybrid technique for notifying the jobtracker.
              //   a. the first notification is sent after max-retries
              //   b. subsequent notifications are sent after 2 retries.
              if ((noFailedFetches >= maxFetchRetriesPerMap)
                  && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
                synchronized (ReduceTask.this) {
                  taskStatus.addFetchFailedMap(mapTaskId);
                  LOG.info("Failed to fetch map-output from " + mapTaskId +
                           " even after MAX_FETCH_RETRIES_PER_MAP retries...
"
                           + " reporting to the JobTracker");
                }
              }
              // note unique failed-fetch maps
              if (noFailedFetches == maxFetchRetriesPerMap) {
                fetchFailedMaps.add(mapId);

                // did we have too many unique failed-fetch maps?
                // and did we fail on too many fetch attempts?
                // and did we progress enough
                //     or did we wait for too long without any progress?

                // check if the reducer is healthy
                boolean reducerHealthy =
                    (((float)totalFailures / (totalFailures + numCopied))
                     < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);

                // check if the reducer has progressed enough
                boolean reducerProgressedEnough =
                    (((float)numCopied / numMaps)
                     >= MIN_REQUIRED_PROGRESS_PERCENT);

                // check if the reducer is stalled for a long time
                // duration for which the reducer is stalled
                int stallDuration =
                    (int)(System.currentTimeMillis() - lastProgressTime);
                // duration for which the reducer ran with progress
                int shuffleProgressDuration =
                    (int)(lastProgressTime - startTime);
                // min time the reducer should run without getting killed
                int minShuffleRunDuration =
                    (shuffleProgressDuration > maxMapRuntime)
                    ? shuffleProgressDuration
                    : maxMapRuntime;
                boolean reducerStalled =
                    (((float)stallDuration / minShuffleRunDuration)
                     >= MAX_ALLOWED_STALL_TIME_PERCENT);

                // kill if not healthy and has insufficient progress
                if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
                     fetchFailedMaps.size() == (numMaps -
copiedMapOutputs.size()))
                    && !reducerHealthy
                    && (!reducerProgressedEnough || reducerStalled)) {
                  LOG.fatal("Shuffle failed with too many fetch failures " +

                            "and insufficient progress!" +
                            "Killing task " + getTaskID() + ".");
                  umbilical.shuffleError(getTaskID(),
                                         "Exceeded
MAX_FAILED_UNIQUE_FETCHES;"
                                         + " bailing-out.");
                }
              }

              // back off exponentially until num_retries <= max_retries
              // back off by max_backoff/2 on subsequent failed attempts
              currentTime = System.currentTimeMillis();
              int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
                                   ? BACKOFF_INIT
                                     * (1 << (noFailedFetches - 1))
                                   : (this.maxBackoff * 1000 / 2);
              penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
              LOG.warn(reduceTask.getTaskID() + " adding host " +
                       cr.getHost() + " to penalty box, next contact in " +
                       (currentBackOff/1000) + " seconds");
            }
            uniqueHosts.remove(cr.getHost());
            numInFlight--;
          }
        }

        (..)
    }


[/code]


Thanks,
-- 
Pedro

Mime
View raw message