giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ereis...@apache.org
Subject git commit: updated refs/heads/trunk to 3a20c55
Date Wed, 06 Nov 2013 01:05:38 GMT
Updated Branches:
  refs/heads/trunk b8de740e2 -> 3a20c5597


GIRAPH-737: Giraph Application Master: Move to new and stable YARN API (mislam via ereisman)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/3a20c559
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/3a20c559
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/3a20c559

Branch: refs/heads/trunk
Commit: 3a20c5597c4f7ca67421eb39fd426cc8b0cbef2d
Parents: b8de740
Author: Eli Reisman <ereisman@apache.org>
Authored: Tue Nov 5 16:50:59 2013 -0800
Committer: Eli Reisman <ereisman@apache.org>
Committed: Tue Nov 5 16:50:59 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 README                                          |   4 +
 findbugs-exclude.xml                            |   4 +
 .../giraph/comm/netty/SaslNettyServer.java      |  22 +-
 .../comm/netty/handler/SaslServerHandler.java   |   8 +-
 .../giraph/yarn/GiraphApplicationMaster.java    | 751 ++++++++++---------
 .../apache/giraph/yarn/GiraphYarnClient.java    | 177 +++--
 .../java/org/apache/giraph/yarn/YarnUtils.java  |  13 +-
 pom.xml                                         |  94 +--
 9 files changed, 583 insertions(+), 492 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/3a20c559/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d970fa3..04656f0 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-737: Giraph Application Master: Move to new and stable YARN API (mislam via ereisman)
+
   GIRAPH-791: HiveGiraphRunner picks -D options too late (majakabiljo)
 
   GIRAPH-790: Add a way to automatically retry a job (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/3a20c559/README
----------------------------------------------------------------------
diff --git a/README b/README
index fd9ec20..a2d5dc0 100644
--- a/README
+++ b/README
@@ -50,6 +50,10 @@ explicitly specify it with "mvn -Phadoop_0.20.203 <goals>".
 
   You may tell maven to use this version with "mvn -Phadoop_0.23 <goals>".
 
+- Apache Hadoop Yarn with 2.2.0
+
+  You may tell maven to use this version with "mvn -Phadoop_yarn -Dhadoop.version=2.2.0 <goals>".
+
 - Apache Hadoop 3.0.0-SNAPSHOT
 
   You may tell maven to use this version with "mvn -Phadoop_trunk <goals>".

http://git-wip-us.apache.org/repos/asf/giraph/blob/3a20c559/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index 21aa4ef..e0466f7 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -43,6 +43,10 @@
     <Bug pattern="DM_EXIT"/>
   </Match>
   <Match>
+    <Class name="org.apache.giraph.yarn.GiraphYarnTask$OverrideExceptionHandler"/>
+    <Bug pattern="DM_EXIT"/>
+  </Match>
+  <Match>
     <Class name="org.apache.giraph.worker.BspServiceWorker"/>
     <Bug pattern="DM_EXIT"/>
   </Match>

http://git-wip-us.apache.org/repos/asf/giraph/blob/3a20c559/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
index 00a802f..9039141 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
@@ -40,6 +40,7 @@ import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 
 /**
  * Encapsulates SASL server logic for Giraph BSP worker servers.
@@ -59,9 +60,26 @@ public class SaslNettyServer extends SaslRpcServer {
    *
    * @param secretManager supplied by SaslServerHandler.
    */
-  public SaslNettyServer(JobTokenSecretManager secretManager) {
+  public SaslNettyServer(JobTokenSecretManager secretManager)
+    throws IOException {
+    this(secretManager, AuthMethod.SIMPLE);
+  }
+
+  /**
+   * Constructor
+   *
+   * @param secretManager supplied by SaslServerHandler.
+   * @param authMethod Authentication method
+   */
+  public SaslNettyServer(JobTokenSecretManager secretManager,
+    AuthMethod authMethod) throws IOException {
+/*if[HADOOP_1_SECRET_MANAGER]
+else[HADOOP_1_SECRET_MANAGER]*/
+    super(authMethod);
+/*end[HADOOP_1_SECRET_MANAGER]*/
     if (LOG.isDebugEnabled()) {
-      LOG.debug("SaslNettyServer: Secret manager is: " + secretManager);
+      LOG.debug("SaslNettyServer: Secret manager is: " + secretManager +
+        " with authmethod " + authMethod);
     }
 /*if[HADOOP_1_SECRET_MANAGER]
 else[HADOOP_1_SECRET_MANAGER]*/

http://git-wip-us.apache.org/repos/asf/giraph/blob/3a20c559/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
index 922f373..da06334 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
@@ -38,6 +38,7 @@ import org.apache.log4j.Logger;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -108,7 +109,12 @@ public class SaslServerHandler extends
           LOG.debug("No saslNettyServer for " + ctx.getChannel() +
               " yet; creating now, with secret manager: " + secretManager);
         }
-        saslNettyServer = new SaslNettyServer(secretManager);
+        try {
+          saslNettyServer = new SaslNettyServer(secretManager,
+            AuthMethod.SIMPLE);
+        } catch (IOException ioe) { //TODO:
+          throw new RuntimeException(ioe);
+        }
         NettyServer.CHANNEL_SASL_NETTY_SERVERS.set(ctx.getChannel(),
             saslNettyServer);
       } else {

http://git-wip-us.apache.org/repos/asf/giraph/blob/3a20c559/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
index c2b88a0..72978bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphApplicationMaster.java
@@ -18,56 +18,50 @@
 package org.apache.giraph.yarn;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 
 import com.google.common.collect.Maps;
-import java.security.PrivilegedAction;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
-  .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.protocolrecords
-  .RegisterApplicationMasterRequest;
+  .RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -98,15 +92,21 @@ public class GiraphApplicationMaster {
    * the YarnClient. We will need to export this resource to the tasks also.
    * Construct the HEARTBEAT to use to ping the RM about job progress/health.
    */
+//TODO
+  /** For status update for clients - yet to be implemented\\
+  * Hostname of the container
+  */
+  private String appMasterHostname = "";
+  /** Port on which the app master listens for status updates from clients*/
+  private int appMasterRpcPort = 0;
+  /** Tracking url to which app master publishes info for clients to monitor*/
+  private String appMasterTrackingUrl = "";
+
   static {
     // pick up new conf XML file and populate it with stuff exported from client
     Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
   }
 
-  /** Handle to AppMaster's RPC connection to YARN and the RM. */
-  private final AMRMProtocol resourceManager;
-  /** bootstrap handle to YARN RPC service */
-  private final YarnRPC rpc;
   /** GiraphApplicationMaster's application attempt id */
   private final ApplicationAttemptId appAttemptId;
   /** GiraphApplicationMaster container id. Leave me here, I'm very useful */
@@ -117,6 +117,8 @@ public class GiraphApplicationMaster {
   private final int heapPerContainer;
   /** Giraph configuration for this job, transported here by YARN framework */
   private final ImmutableClassesGiraphConfiguration giraphConf;
+  /** Yarn configuration for this job*/
+  private final YarnConfiguration yarnConf;
   /** Completed Containers Counter */
   private final AtomicInteger completedCount;
   /** Failed Containers Counter */
@@ -127,12 +129,20 @@ public class GiraphApplicationMaster {
   private final AtomicInteger successfulCount;
   /** the ACK #'s for AllocateRequests + heartbeats == last response # */
   private AtomicInteger lastResponseId;
+  /** buffer tostore all tokens */
+  private ByteBuffer allTokens;
   /** Executor to attempt asynchronous launches of Giraph containers */
   private ExecutorService executor;
   /** YARN progress is a <code>float</code> between 0.0f and 1.0f */
-  private float progress;
-  /** An empty resource request with which to send heartbeats + progress */
-  private AllocateRequest heartbeat;
+  //Handle to communicate with the Resource Manager
+  @SuppressWarnings("rawtypes")
+  private AMRMClientAsync amRMClient;
+  /** Handle to communicate with the Node Manager */
+  private NMClientAsync nmClientAsync;
+  /** Listen to process the response from the Node Manager */
+  private NMCallbackHandler containerListener;
+  /** whether all containers finishe */
+  private volatile boolean done;
 
   /**
    * Construct the GiraphAppMaster, populate fields using env vars
@@ -144,359 +154,221 @@ public class GiraphApplicationMaster {
     throws IOException {
     containerId = cId; // future good stuff will need me to operate.
     appAttemptId = aId;
-    progress = 0.0f;
     lastResponseId = new AtomicInteger(0);
     giraphConf =
       new ImmutableClassesGiraphConfiguration(new GiraphConfiguration());
+    yarnConf = new YarnConfiguration(giraphConf);
     completedCount = new AtomicInteger(0);
     failedCount = new AtomicInteger(0);
     allocatedCount = new AtomicInteger(0);
     successfulCount = new AtomicInteger(0);
-    rpc = YarnRPC.create(giraphConf);
-    resourceManager = getHandleToRm();
     containersToLaunch = giraphConf.getMaxWorkers() + 1;
     executor = Executors.newFixedThreadPool(containersToLaunch);
     heapPerContainer = giraphConf.getYarnTaskHeapMb();
+    LOG.info("GiraphAM  for ContainerId " + cId + " ApplicationAttemptId " +
+      aId);
   }
 
   /**
    * Coordinates all requests for Giraph's worker/master task containers, and
    * manages application liveness heartbeat, completion status, teardown, etc.
+   * @return success or failure
    */
-  private void run() {
-    // register Application Master with the YARN Resource Manager so we can
-    // begin requesting resources. The response contains useful cluster info.
-    try {
-      resourceManager.registerApplicationMaster(getRegisterAppMasterRequest());
-    } catch (IOException ioe) {
-      throw new IllegalStateException(
-        "GiraphApplicationMaster failed to register with RM.", ioe);
-    }
-
+  private boolean run() throws YarnException, IOException {
+    boolean success = false;
     try {
-      // make the request only ONCE; only request more on container failure etc.
-      AMResponse amResponse = sendAllocationRequest();
-      logClusterResources(amResponse);
-      // loop here, waiting for TOTAL # REQUESTED containers to be available
-      // and launch them piecemeal they are reported to us in heartbeat pings.
-      launchContainersAsynchronously(amResponse);
-      // wait for the containers to finish & tally success/fails
-      awaitJobCompletion(); // all launched tasks are done before complete call
+      getAllTokens();
+      registerRMCallBackHandler();
+      registerNMCallbackHandler();
+      registerAMToRM();
+      madeAllContainerRequestToRM();
+      LOG.info("Wait to finish ..");
+      while (!done) {
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException ex) {
+          LOG.error(ex);
+          //TODO:
+        }
+      }
+      LOG.info("Done " + done);
     } finally {
       // if we get here w/o problems, the executor is already long finished.
       if (null != executor && !executor.isTerminated()) {
+        LOG.info("Forcefully terminating executors with done =:" + done);
         executor.shutdownNow(); // force kill, especially if got here by throw
       }
-      // When the application completes, it should send a "finish request" to RM
-      try {
-        resourceManager.finishApplicationMaster(buildFinishAppMasterRequest());
-      } catch (YarnRemoteException yre) {
-        LOG.error("GiraphApplicationMaster failed to un-register with RM", yre);
-      }
-      if (null != rpc) {
-        rpc.stopProxy(resourceManager, giraphConf);
-      }
+      success = finish();
     }
+    return success;
   }
 
   /**
-   * Reports the cluster resources in the AM response to our initial ask.
-   * @param amResponse the AM response from YARN.
+   * Call when the application is done
+   * @return if all containers succeed
    */
-  private void logClusterResources(final AMResponse amResponse) {
-    // Check what the current available resources in the cluster are
-    Resource availableResources = amResponse.getAvailableResources();
-    LOG.info("Initial Giraph resource request for " + containersToLaunch +
-      " containers has been submitted. " +
-      "The RM reports cluster headroom is: " + availableResources);
-  }
+  private boolean finish() {
+    // When the application completes, it should stop all running containers
+    LOG.info("Application completed. Stopping running containers");
+    nmClientAsync.stop();
 
-  /**
-   * Utility to build the final "job run is finished" request to the RM.
-   * @return the finish app master request, to send to the RM.
-   */
-  private FinishApplicationMasterRequest buildFinishAppMasterRequest() {
+    // When the application completes, it should send a finish application
+    // signal to the RM
     LOG.info("Application completed. Signalling finish to RM");
-    FinishApplicationMasterRequest finishRequest =
-      Records.newRecord(FinishApplicationMasterRequest.class);
-    finishRequest.setAppAttemptId(appAttemptId);
     FinalApplicationStatus appStatus;
-    String appMessage = "Container Diagnostics: " +
-      " allocated=" + allocatedCount.get() +
-      ", completed=" + completedCount.get() +
-      ", succeeded=" + successfulCount.get() +
-      ", failed=" + failedCount.get();
-    if (successfulCount.get() == containersToLaunch) {
+    String appMessage = null;
+    boolean success = true;
+    if (failedCount.get() == 0 &&
+        completedCount.get() == containersToLaunch) {
       appStatus = FinalApplicationStatus.SUCCEEDED;
     } else {
       appStatus = FinalApplicationStatus.FAILED;
+      appMessage = "Diagnostics." + ", total=" + containersToLaunch +
+        ", completed=" + completedCount.get() +  ", failed=" +
+        failedCount.get();
+      success = false;
+    }
+    try {
+      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+    } catch (YarnException ex) {
+      LOG.error("Failed to unregister application", ex);
+    } catch (IOException e) {
+      LOG.error("Failed to unregister application", e);
     }
-    finishRequest.setDiagnostics(appMessage);
-    finishRequest.setFinishApplicationStatus(appStatus);
-    return finishRequest;
-  }
 
+    amRMClient.stop();
+    return success;
+  }
   /**
-   * Loop and check the status of the containers until all are finished,
-   * logging how each container meets its end: success, error, or abort.
+   * Add all containers' request
+   * @return
    */
-  private void awaitJobCompletion() {
-    List<ContainerStatus> completedContainers;
-    do {
-      try {
-        Thread.sleep(SLEEP_BETWEEN_HEARTBEATS_MSECS);
-      } catch (InterruptedException ignored) {
-        final int notFinished = containersToLaunch - completedCount.get();
-        LOG.info("GiraphApplicationMaster interrupted from sleep while " +
-          " waiting for " + notFinished + "containers to finish job.");
-      }
-      updateProgress();
-      completedContainers =
-          sendHeartbeat().getAMResponse().getCompletedContainersStatuses();
-      for (ContainerStatus containerStatus : completedContainers) {
-        LOG.info("Got container status for containerID= " +
-          containerStatus.getContainerId() +
-          ", state=" + containerStatus.getState() +
-          ", exitStatus=" + containerStatus.getExitStatus() +
-          ", diagnostics=" + containerStatus.getDiagnostics());
-        switch (containerStatus.getExitStatus()) {
-        case YARN_SUCCESS_EXIT_STATUS:
-          successfulCount.incrementAndGet();
-          break;
-        case YARN_ABORT_EXIT_STATUS:
-          break; // not success or fail
-        default:
-          failedCount.incrementAndGet();
-          break;
-        }
-        completedCount.incrementAndGet();
-      } // end completion check loop
-    } while (completedCount.get() < containersToLaunch);
+  private void madeAllContainerRequestToRM() {
+    // Setup ask for containers from RM
+    // Send request for containers to RM
+    // Until we get our fully allocated quota, we keep on polling RM for
+    // containers
+    // Keep looping until all the containers are launched and shell script
+    // executed on them ( regardless of success/failure).
+    for (int i = 0; i < containersToLaunch; ++i) {
+      ContainerRequest containerAsk = setupContainerAskForRM();
+      amRMClient.addContainerRequest(containerAsk);
+    }
   }
 
-  /** Update the progress value for our next heartbeat (allocate request) */
-  private void updateProgress() {
-    // set progress to "half done + ratio of completed containers so far"
-    final float ratio = completedCount.get() / (float) containersToLaunch;
-    progress = 0.5f + ratio / 2.0f;
-  }
+   /**
+    * Setup the request that will be sent to the RM for the container ask.
+    *
+    * @return the setup ResourceRequest to be sent to RM
+    */
+  private ContainerRequest setupContainerAskForRM() {
+    // setup requirements for hosts
+    // using * as any host will do for the distributed shell app
+    // set the priority for the request
+    Priority pri = Records.newRecord(Priority.class);
+    // TODO - what is the range for priority? how to decide?
+    pri.setPriority(GiraphConstants.GIRAPH_YARN_PRIORITY);
 
-  /**
-   * Loop while checking container request status, adding each new bundle of
-   * containers allocated to our executor to launch (run Giraph BSP task) the
-   * job on each. Giraph's full resource request was sent ONCE, but these
-   * containers will become available in groups, over a period of time.
-   * @param amResponse metadata about our AllocateRequest's results.
-   */
-  private void launchContainersAsynchronously(AMResponse amResponse) {
-    List<Container> allocatedContainers;
-    do {
-      // get fresh report on # alloc'd containers, sleep between checks
-      if (null == amResponse) {
-        amResponse = sendHeartbeat().getAMResponse();
-      }
-      allocatedContainers = amResponse.getAllocatedContainers();
-      allocatedCount.addAndGet(allocatedContainers.size());
-      LOG.info("Waiting for task containers: " + allocatedCount.get() +
-        " allocated out of " + containersToLaunch + " required.");
-      startContainerLaunchingThreads(allocatedContainers);
-      amResponse = null;
-      try {
-        Thread.sleep(SLEEP_BETWEEN_HEARTBEATS_MSECS);
-      } catch (InterruptedException ignored) {
-        LOG.info("launchContainerAsynchronously() raised InterruptedException");
-      }
-    } while (containersToLaunch > allocatedCount.get());
-  }
+    // Set up resource type requirements
+    // For now, only memory is supported so we set memory requirements
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(heapPerContainer);
 
-  /**
-   * For each container successfully allocated, attempt to set up and launch
-   * a Giraph worker/master task.
-   * @param allocatedContainers the containers we have currently allocated.
-   */
-  private void startContainerLaunchingThreads(final List<Container>
-    allocatedContainers) {
-    progress = allocatedCount.get() / (2.0f * containersToLaunch);
-    int placeholder = 0;
-    for (Container allocatedContainer : allocatedContainers) {
-      LOG.info("Launching shell command on a new container." +
-        ", containerId=" + allocatedContainer.getId() +
-        ", containerNode=" + allocatedContainer.getNodeId().getHost() +
-        ":" + allocatedContainer.getNodeId().getPort() +
-        ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() +
-        ", containerState=" + allocatedContainer.getState() +
-        ", containerResourceMemory=" +
-        allocatedContainer.getResource().getMemory());
-      // Launch and start the container on a separate thread to keep the main
-      // thread unblocked as all containers may not be allocated at one go.
-      LaunchContainerRunnable launchThread =
-        new LaunchContainerRunnable(allocatedContainer, heapPerContainer);
-      executor.execute(launchThread);
-    }
+    ContainerRequest request = new ContainerRequest(capability, null, null,
+      pri);
+    LOG.info("Requested container ask: " + request.toString());
+    return request;
   }
 
   /**
-   * Sends heartbeat messages that include progress amounts. These are in the
-   * form of a YARN AllocateRequest object that asks for 0 resources.
-   * @return the AllocateResponse, which we may or may not need.
+   * Populate allTokens with the tokens received
+   * @return
    */
-  private AllocateResponse sendHeartbeat() {
-    heartbeat.setProgress(progress);
-    heartbeat.setResponseId(lastResponseId.incrementAndGet());
-    AllocateResponse allocateResponse = null;
-    try {
-      allocateResponse = resourceManager.allocate(heartbeat);
-      final int responseId = allocateResponse.getAMResponse().getResponseId();
-      if (responseId != lastResponseId.get()) {
-        lastResponseId.set(responseId);
+  private void getAllTokens() throws IOException {
+    Credentials credentials = UserGroupInformation.getCurrentUser()
+        .getCredentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    // Now remove the AM->RM token so that containers cannot access it.
+    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+    while (iter.hasNext()) {
+      Token<?> token = iter.next();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Token type :" + token.getKind());
       }
-      checkForRebootFlag(allocateResponse.getAMResponse());
-      return allocateResponse;
-    } catch (YarnRemoteException yre) {
-      throw new IllegalStateException("sendHeartbeat() failed with " +
-        "YarnRemoteException: ", yre);
-    }
-  }
-
-  /**
-   * Compose and send the allocation request for our Giraph BSP worker/master
-   * compute nodes. Right now the requested containers are identical, mirroring
-   * Giraph's behavior when running on Hadoop MRv1. Giraph could use YARN
-   * to set fine-grained capability to each container, including host choice.
-   * @return The AM resource descriptor with our container allocations.
-   */
-  private AMResponse sendAllocationRequest() {
-    AllocateRequest allocRequest = Records.newRecord(AllocateRequest.class);
-    try {
-      List<ResourceRequest> containerList = buildResourceRequests();
-      allocRequest.addAllAsks(containerList);
-      List<ContainerId> releasedContainers = Lists.newArrayListWithCapacity(0);
-      allocRequest.setResponseId(lastResponseId.get());
-      allocRequest.setApplicationAttemptId(appAttemptId);
-      allocRequest.addAllReleases(releasedContainers);
-      allocRequest.setProgress(progress);
-      AllocateResponse allocResponse = resourceManager.allocate(allocRequest);
-      AMResponse amResponse = allocResponse.getAMResponse();
-      if (amResponse.getResponseId() != lastResponseId.get()) {
-        lastResponseId.set(amResponse.getResponseId());
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove();
       }
-      checkForRebootFlag(amResponse);
-      // now, make THIS our new HEARTBEAT object, but with ZERO new requests!
-      initHeartbeatRequestObject(allocRequest);
-      return amResponse;
-    } catch (YarnRemoteException yre) {
-      throw new IllegalStateException("Giraph Application Master could not " +
-        "successfully allocate the specified containers from the RM.", yre);
     }
+    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
   }
 
   /**
-   * If the YARN RM gets way out of sync with our App Master, its time to
-   * fail the job/restart. This should trigger the job end and cleanup.
-   * @param amResponse RPC response from YARN RM to check for reboot flag.
-   */
-  private void checkForRebootFlag(AMResponse amResponse) {
-    if (amResponse.getReboot()) {
-      LOG.error("AMResponse: " + amResponse + " raised YARN REBOOT FLAG!");
-      throw new RuntimeException("AMResponse " + amResponse +
-        " signaled GiraphApplicationMaster with REBOOT FLAG. Failing job.");
-    }
-  }
-
-
-  /**
-   * Reuses the initial container request (switched to "0 asks" so no new allocs
-   * occur) and sends all heartbeats using that request object.
-   * @param allocRequest the allocation request object to use as heartbeat.
+   * Register RM callback and start listening
+   * @return
    */
-  private void initHeartbeatRequestObject(AllocateRequest allocRequest) {
-    allocRequest.clearAsks();
-    allocRequest.addAllAsks(Lists.<ResourceRequest>newArrayListWithCapacity(0));
-    heartbeat = allocRequest;
+  private void registerRMCallBackHandler() {
+    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000,
+      allocListener);
+    amRMClient.init(yarnConf);
+    amRMClient.start();
   }
 
   /**
-   * Utility to construct the ResourceRequest for our resource ask: all the
-   * Giraph containers we need, and their memory/priority requirements.
-   * @return a list of ResourceRequests to send (just one, for Giraph tasks)
+   * Register NM callback and start listening
+   * @return
    */
-  private List<ResourceRequest> buildResourceRequests() {
-    // set up resource request for our Giraph BSP application
-    ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
-    resourceRequest.setHostName("*"); // hand pick our worker locality someday
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(GiraphConstants.GIRAPH_YARN_PRIORITY);
-    resourceRequest.setPriority(pri);
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setVirtualCores(1); // new YARN API, won't work version < 2.0.3
-    capability.setMemory(heapPerContainer);
-    resourceRequest.setCapability(capability);
-    resourceRequest.setNumContainers(containersToLaunch);
-    return ImmutableList.of(resourceRequest);
+  private void registerNMCallbackHandler() {
+    containerListener = new NMCallbackHandler();
+    nmClientAsync = new NMClientAsyncImpl(containerListener);
+    nmClientAsync.init(yarnConf);
+    nmClientAsync.start();
   }
-
   /**
-   * Obtain handle to RPC connection to Resource Manager.
-   * @return the AMRMProtocol handle to YARN RPC.
+   * Register AM to RM
+   * @return AM register response
    */
-  private AMRMProtocol getHandleToRm() {
-    YarnConfiguration yarnConf = new YarnConfiguration(giraphConf);
-    final InetSocketAddress rmAddress = yarnConf.getSocketAddr(
-      YarnConfiguration.RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    if (UserGroupInformation.isSecurityEnabled()) {
-      UserGroupInformation currentUser;
-      try {
-        currentUser = UserGroupInformation.getCurrentUser();
-      } catch (IOException ioe) {
-        throw new IllegalStateException("Could not obtain UGI for user.", ioe);
-      }
-      String tokenURLEncodedStr = System.getenv(
-        ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
-      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
-      try {
-        token.decodeFromUrlString(tokenURLEncodedStr);
-      } catch (IOException ioe) {
-        throw new IllegalStateException("Could not decode token from URL", ioe);
-      }
-      SecurityUtil.setTokenService(token, rmAddress);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("AppMasterToken is: " + token);
+  private RegisterApplicationMasterResponse registerAMToRM()
+    throws YarnException {
+    // register Application Master with the YARN Resource Manager so we can
+    // begin requesting resources.
+    try {
+      if (UserGroupInformation.isSecurityEnabled()) {
+        LOG.info("SECURITY ENABLED ");
       }
-      currentUser.addToken(token);
-      return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
-        @Override
-        public AMRMProtocol run() {
-          return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
-            rmAddress, giraphConf);
-        }
-      });
-    } else { // non-secure
-      return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
-        rmAddress, yarnConf);
+      // TODO: provide actual call back details
+      RegisterApplicationMasterResponse response = amRMClient
+        .registerApplicationMaster(appMasterHostname
+        , appMasterRpcPort, appMasterTrackingUrl);
+      return response;
+    } catch (IOException ioe) {
+      throw new IllegalStateException(
+        "GiraphApplicationMaster failed to register with RM.", ioe);
     }
   }
 
   /**
-   * Get the request to register this Application Master with the RM.
-   * @return the populated AM request.
+   * For each container successfully allocated, attempt to set up and launch
+   * a Giraph worker/master task.
+   * @param allocatedContainers the containers we have currently allocated.
    */
-  private RegisterApplicationMasterRequest getRegisterAppMasterRequest() {
-    RegisterApplicationMasterRequest appMasterRequest =
-        Records.newRecord(RegisterApplicationMasterRequest.class);
-    appMasterRequest.setApplicationAttemptId(appAttemptId);
-    try {
-      appMasterRequest.setHost(InetAddress.getLocalHost().getHostName());
-    } catch (UnknownHostException uhe) {
-      throw new IllegalStateException(
-        "Cannot resolve GiraphApplicationMaster's local hostname.", uhe);
+  private void startContainerLaunchingThreads(final List<Container>
+    allocatedContainers) {
+    for (Container allocatedContainer : allocatedContainers) {
+      LOG.info("Launching command on a new container." +
+        ", containerId=" + allocatedContainer.getId() +
+        ", containerNode=" + allocatedContainer.getNodeId().getHost() +
+        ":" + allocatedContainer.getNodeId().getPort() +
+        ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() +
+        ", containerResourceMemory=" +
+        allocatedContainer.getResource().getMemory());
+      // Launch and start the container on a separate thread to keep the main
+      // thread unblocked as all containers may not be allocated at one go.
+      LaunchContainerRunnable runnableLaunchContainer =
+        new LaunchContainerRunnable(allocatedContainer, containerListener);
+      executor.execute(runnableLaunchContainer);
     }
-    // useful for a Giraph WebUI or whatever: play with these
-    // appMasterRequest.setRpcPort(appMasterRpcPort);
-    // appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
-    return appMasterRequest;
   }
 
   /**
@@ -505,7 +377,7 @@ public class GiraphApplicationMaster {
    * as Giraph tasks need identical HDFS-based resources (jars etc.) to run.
    * @return the resource map for a ContainerLaunchContext
    */
-  private Map<String, LocalResource> getTaskResourceMap() {
+  private synchronized Map<String, LocalResource> getTaskResourceMap() {
     // Set the local resources: just send the copies already in HDFS
     if (null == LOCAL_RESOURCES) {
       LOCAL_RESOURCES = Maps.newHashMap();
@@ -541,40 +413,59 @@ public class GiraphApplicationMaster {
   }
 
   /**
+   * Application entry point
+   * @param args command-line args (set by GiraphYarnClient, if any)
+   */
+  public static void main(final String[] args) {
+    boolean result = false;
+    LOG.info("Starting GitaphAM ");
+    String containerIdString =  System.getenv().get(
+      Environment.CONTAINER_ID.name());
+    if (containerIdString == null) {
+      // container id should always be set in the env by the framework
+      throw new IllegalArgumentException("ContainerId not found in env vars.");
+    }
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
+    ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
+    try {
+      GiraphApplicationMaster giraphAppMaster =
+        new GiraphApplicationMaster(containerId, appAttemptId);
+      result = giraphAppMaster.run();
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Throwable t) {
+      // CHECKSTYLE: resume IllegalCatch
+      LOG.error("GiraphApplicationMaster caught a " +
+                  "top-level exception in main.", t);
+      System.exit(1);
+    }
+    if (result) {
+      LOG.info("Giraph Application Master completed successfully. exiting");
+      System.exit(0);
+    } else {
+      LOG.info("Giraph Application Master failed. exiting");
+      System.exit(2);
+    }
+  }
+
+  /**
    * Thread to connect to the {@link ContainerManager} and launch the container
    * that will house one of our Giraph worker (or master) tasks.
    */
   private class LaunchContainerRunnable implements Runnable {
     /** Allocated container */
     private Container container;
-    /** Handle to communicate with ContainerManager */
-    private ContainerManager containerManager;
-    /** Heap memory in MB to allocate for this JVM in the launched container */
-    private final int heapSize;
+    /** NM listener */
+    private NMCallbackHandler containerListener;
 
     /**
      * Constructor.
      * @param newGiraphTaskContainer Allocated container
-     * @param heapMb the <code>-Xmx</code> setting for each launched task.
+     * @param containerListener container listener.
      */
     public LaunchContainerRunnable(final Container newGiraphTaskContainer,
-      final int heapMb) {
+      NMCallbackHandler containerListener) {
       this.container = newGiraphTaskContainer;
-      this.heapSize = heapMb;
-    }
-
-    /**
-     * Helper function to connect to ContainerManager, which resides on the
-     * same compute node as this Giraph task's container. The CM starts tasks.
-     */
-    private void connectToCM() {
-      LOG.debug("Connecting to CM for containerid=" + container.getId());
-      String cmIpPortStr = container.getNodeId().getHost() + ":" +
-        container.getNodeId().getPort();
-      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
-      LOG.info("Connecting to CM at " + cmIpPortStr);
-      this.containerManager = (ContainerManager)
-        rpc.getProxy(ContainerManager.class, cmAddress, giraphConf);
+      this.containerListener = containerListener;
     }
 
     /**
@@ -584,18 +475,11 @@ public class GiraphApplicationMaster {
      */
     public void run() {
       // Connect to ContainerManager
-      connectToCM();
       // configure the launcher for the Giraph task it will host
-      StartContainerRequest startReq =
-        Records.newRecord(StartContainerRequest.class);
-      startReq.setContainerLaunchContext(buildContainerLaunchContext());
+      ContainerLaunchContext ctx = buildContainerLaunchContext();
       // request CM to start this container as spec'd in ContainerLaunchContext
-      try {
-        containerManager.startContainer(startReq);
-      } catch (YarnRemoteException yre) {
-        LOG.error("StartContainerRequest failed for containerId=" +
-                    container.getId(), yre);
-      }
+      containerListener.addContainer(container.getId(), container);
+      nmClientAsync.startContainerAsync(container, ctx);
     }
 
     /**
@@ -609,11 +493,17 @@ public class GiraphApplicationMaster {
         container.getId());
       ContainerLaunchContext launchContext = Records
         .newRecord(ContainerLaunchContext.class);
-      launchContext.setContainerId(container.getId());
-      launchContext.setResource(container.getResource());
       // args inject the CLASSPATH, heap MB, and TaskAttemptID for launched task
       final List<String> commands = generateShellExecCommand();
+      LOG.info("Conatain launch Commands :" + commands.get(0));
       launchContext.setCommands(commands);
+      // Set up tokens for the container too. We are
+      // populating them mainly for NodeManagers to be able to download any
+      // files in the distributed file-system. The tokens are otherwise also
+      // useful in cases, for e.g., when one is running a
+      // "hadoop dfs" like command
+      launchContext.setTokens(allTokens.slice());
+
       // add user information to the job
       String jobUserName = "ERROR_UNKNOWN_USER";
       UserGroupInformation ugi = null;
@@ -624,7 +514,7 @@ public class GiraphApplicationMaster {
         jobUserName =
           System.getenv(ApplicationConstants.Environment.USER.name());
       }
-      launchContext.setUser(jobUserName);
+      //launchContext.setUser(jobUserName);
       LOG.info("Setting username in ContainerLaunchContext to: " + jobUserName);
       // Set the environment variables to inject into remote task's container
       buildEnvironment(launchContext);
@@ -639,8 +529,8 @@ public class GiraphApplicationMaster {
      */
     private List<String> generateShellExecCommand() {
       return ImmutableList.of("java " +
-        "-Xmx" + heapSize + "M " +
-        "-Xms" + heapSize + "M " +
+        "-Xmx" + heapPerContainer + "M " +
+        "-Xms" + heapPerContainer + "M " +
         "-cp .:${CLASSPATH} " +
         "org.apache.giraph.yarn.GiraphYarnTask " +
         appAttemptId.getApplicationId().getClusterTimestamp() + " " +
@@ -671,29 +561,144 @@ public class GiraphApplicationMaster {
   }
 
   /**
-   * Application entry point
-   * @param args command-line args (set by GiraphYarnClient, if any)
+   * CallbackHandler to process RM async calls
    */
-  public static void main(final String[] args) {
-    String containerIdString =
-        System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
-    if (containerIdString == null) {
-      // container id should always be set in the env by the framework
-      throw new IllegalArgumentException("ContainerId not found in env vars.");
+  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onContainersCompleted(List<ContainerStatus>
+      completedContainers) {
+      LOG.info("Got response from RM for container ask, completedCnt=" +
+        completedContainers.size());
+      for (ContainerStatus containerStatus : completedContainers) {
+        LOG.info("Got container status for containerID=" +
+          containerStatus.getContainerId() + ", state=" +
+          containerStatus.getState() + ", exitStatus=" +
+          containerStatus.getExitStatus() + ", diagnostics=" +
+          containerStatus.getDiagnostics());
+        switch (containerStatus.getExitStatus()) {
+        case YARN_SUCCESS_EXIT_STATUS:
+          successfulCount.incrementAndGet();
+          break;
+        case YARN_ABORT_EXIT_STATUS:
+          break; // not success or fail
+        default:
+          failedCount.incrementAndGet();
+          break;
+        }
+        completedCount.incrementAndGet();
+      }
+
+      if (completedCount.get() == containersToLaunch) {
+        done = true;
+        LOG.info("All container compeleted. done = " + done);
+      } else {
+        LOG.info("After completion of one conatiner. current status is:" +
+          " completedCount :" + completedCount.get() +
+          " containersToLaunch :" + containersToLaunch +
+          " successfulCount :" + successfulCount.get() +
+          " failedCount :" + failedCount.get());
+      }
     }
-    ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
-    ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
-    try {
-      GiraphApplicationMaster giraphAppMaster =
-        new GiraphApplicationMaster(containerId, appAttemptId);
-      giraphAppMaster.run();
-      // CHECKSTYLE: stop IllegalCatch
-    } catch (Throwable t) {
-      // CHECKSTYLE: resume IllegalCatch
-      LOG.error("GiraphApplicationMaster caught a " +
-                  "top-level exception in main.", t);
-      System.exit(2);
+    @Override
+    public void onContainersAllocated(List<Container> allocatedContainers) {
+      LOG.info("Got response from RM for container ask, allocatedCnt=" +
+          allocatedContainers.size());
+      allocatedCount.addAndGet(allocatedContainers.size());
+      LOG.info("Total allocated # of container so far : " +
+        allocatedCount.get() +
+        " allocated out of " + containersToLaunch + " required.");
+      startContainerLaunchingThreads(allocatedContainers);
+    }
+
+    @Override
+    public void onShutdownRequest() {
+      done = true;
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {
+    }
+
+    @Override
+    public float getProgress() {
+      // set progress to deliver to RM on next heartbeat
+      float progress = (float) completedCount.get() /
+          containersToLaunch;
+      return progress;
+    }
+
+    @Override
+    public void onError(Throwable e) {
+      done = true;
+      amRMClient.stop();
+    }
+  }
+
+  /**
+   * CallbackHandler to process NM async calls
+   */
+  private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
+    /** List of containers */
+    private ConcurrentMap<ContainerId, Container> containers =
+          new ConcurrentHashMap<ContainerId, Container>();
+
+    /**
+     * Add a container
+     * @param containerId id of container
+     * @param container container object
+     * @return
+     */
+    public void addContainer(ContainerId containerId, Container container) {
+      containers.putIfAbsent(containerId, container);
+    }
+
+    @Override
+    public void onContainerStopped(ContainerId containerId) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Succeeded to stop Container " + containerId);
+      }
+      containers.remove(containerId);
+    }
+
+    @Override
+    public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Container Status: id=" + containerId + ", status=" +
+            containerStatus);
+      }
+    }
+
+    @Override
+    public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Succeeded to start Container " + containerId);
+      }
+      Container container = containers.get(containerId);
+      if (container != null) {
+        nmClientAsync.getContainerStatusAsync(containerId,
+          container.getNodeId());
+      }
+    }
+
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      LOG.error("Failed to start Container " + containerId, t);
+      containers.remove(containerId);
+    }
+
+    @Override
+    public void onGetContainerStatusError(
+      ContainerId containerId, Throwable t) {
+      LOG.error("Failed to query the status of Container " + containerId, t);
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+      LOG.error("Failed to stop Container " + containerId);
+      containers.remove(containerId);
     }
-    System.exit(0);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3a20c559/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
index 341db0e..ab6564e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
@@ -30,6 +30,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,10 +43,13 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
 
 import org.apache.log4j.Logger;
@@ -50,6 +57,7 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.nio.ByteBuffer;
 
 /**
  * The initial launcher for a YARN-based Giraph job. This class attempts to
@@ -57,7 +65,7 @@ import java.util.Map;
  * application container to host GiraphApplicationMaster. The RPC connection
  * between the RM and GiraphYarnClient is the YARN ApplicationManager.
  */
-public class GiraphYarnClient extends YarnClientImpl {
+public class GiraphYarnClient {
   static {
     Configuration.addDefaultResource("giraph-site.xml");
   }
@@ -66,7 +74,7 @@ public class GiraphYarnClient extends YarnClientImpl {
   /** Sleep time between silent progress checks */
   private static final int JOB_STATUS_INTERVAL_MSECS = 800;
   /** Memory (in MB) to allocate for our ApplicationMaster container */
-  private static final int YARN_APP_MASTER_MEMORY_MB = 1024;
+  private static final int YARN_APP_MASTER_MEMORY_MB = 512;
 
   /** human-readable job name */
   private final String jobName;
@@ -76,6 +84,8 @@ public class GiraphYarnClient extends YarnClientImpl {
   private ApplicationId appId;
   /** # of sleeps between progress reports to client */
   private int reportCounter;
+  /** Yarn client object */
+  private YarnClient yarnClient;
 
   /**
    * Constructor. Requires caller to hand us a GiraphConfiguration.
@@ -85,13 +95,13 @@ public class GiraphYarnClient extends YarnClientImpl {
    */
   public GiraphYarnClient(GiraphConfiguration giraphConf, String jobName)
     throws IOException {
-    super();
     this.reportCounter = 0;
     this.jobName = jobName;
     this.appId = null; // can't set this until after start()
     this.giraphConf = giraphConf;
     verifyOutputDirDoesNotExist();
-    super.init(this.giraphConf);
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(giraphConf);
   }
 
   /**
@@ -102,35 +112,53 @@ public class GiraphYarnClient extends YarnClientImpl {
    * @param verbose Not implemented yet, to provide compatibility w/GiraphJob
    * @return true if job is successful
    */
-  public boolean run(final boolean verbose) {
+  public boolean run(final boolean verbose) throws YarnException, IOException {
     checkJobLocalZooKeeperSupported();
     // init our connection to YARN ResourceManager RPC
-    start();
+    LOG.info("Running Client");
+    yarnClient.start();
     // request an application id from the RM
-    GetNewApplicationResponse getNewAppResponse;
-    try {
-      getNewAppResponse = super.getNewApplication();
-      // make sure we have the cluster resources to run the job.
-      checkPerNodeResourcesAvailable(getNewAppResponse);
-    } catch (YarnRemoteException yre) {
-      yre.printStackTrace();
-      return false;
-    }
-    appId = getNewAppResponse.getApplicationId();
+ // Get a new application id
+    YarnClientApplication app = yarnClient.createApplication();
+    GetNewApplicationResponse getNewAppResponse = app.
+      getNewApplicationResponse();
+    checkPerNodeResourcesAvailable(getNewAppResponse);
+    // configure our request for an exec container for GiraphApplicationMaster
+    ApplicationSubmissionContext appContext = app.
+      getApplicationSubmissionContext();
+    appId = appContext.getApplicationId();
+    //createAppSubmissionContext(appContext);
+    appContext.setApplicationId(appId);
+    appContext.setApplicationName(jobName);
     LOG.info("Obtained new Application ID: " + appId);
     // sanity check
     applyConfigsForYarnGiraphJob();
-    // configure our request for an exec container for GiraphApplicationMaster
-    ApplicationSubmissionContext appContext = createAppSubmissionContext();
+
     ContainerLaunchContext containerContext = buildContainerLaunchContext();
+    appContext.setResource(buildContainerMemory());
     appContext.setAMContainerSpec(containerContext);
     LOG.info("ApplicationSumbissionContext for GiraphApplicationMaster " +
       "launch container is populated.");
-    // make the request, blow up if fail, loop and report job progress if not
+    //TODO: priority and queue
+    // Set the priority for the application master
+    //Priority pri = Records.newRecord(Priority.class);
+    // TODO - what is the range for priority? how to decide?
+    //pri.setPriority(amPriority);
+    //appContext.setPriority(pri);
+
+    // Set the queue to which this application is to be submitted in the RM
+    //appContext.setQueue(amQueue);
+
+   // make the request, blow up if fail, loop and report job progress if not
     try {
+      LOG.info("Submitting application to ASM");
       // obtain an "updated copy" of the appId for status checks/job kill later
-      appId = super.submitApplication(appContext);
-    } catch (YarnRemoteException yre) {
+      appId = yarnClient.submitApplication(appContext);
+      LOG.info("Got new appId after submission :" + appId);
+    } catch (YarnException yre) {
+      // TODO
+      // Try submitting the same request again
+      // app submission failure?
       throw new RuntimeException("submitApplication(appContext) FAILED.", yre);
     }
     LOG.info("GiraphApplicationMaster container request was submitted to " +
@@ -175,19 +203,24 @@ public class GiraphYarnClient extends YarnClientImpl {
    * @param cluster the GetNewApplicationResponse from the YARN RM.
    */
   private void checkPerNodeResourcesAvailable(
-    final GetNewApplicationResponse cluster) {
+    final GetNewApplicationResponse cluster) throws YarnException, IOException {
     // are there enough containers to go around for our Giraph job?
     List<NodeReport> nodes = null;
     int numContainers = 0;
     long totalAvailable = 0;
     try {
-      nodes = super.getNodeReports();
-    } catch (YarnRemoteException yre) {
+      nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+    } catch (YarnException yre) {
       throw new RuntimeException("GiraphYarnClient could not connect with " +
         "the YARN ResourceManager to determine the number of available " +
         "application containers.", yre);
     }
     for (NodeReport node : nodes) {
+      LOG.info("Got node report from ASM for" +
+        ", nodeId=" + node.getNodeId() +
+        ", nodeAddress" + node.getHttpAddress() +
+        ", nodeRackName" + node.getRackName() +
+        ", nodeNumContainers" + node.getNumContainers());
       numContainers += node.getNumContainers();
       totalAvailable += node.getCapability().getMemory();
     }
@@ -213,9 +246,11 @@ public class GiraphYarnClient extends YarnClientImpl {
    * and re-record the new settings in the GiraphConfiguration for export.
    * @param gnar the GetNewAppResponse from the YARN ResourceManager.
    */
-  private void checkAndAdjustPerTaskHeapSize(GetNewApplicationResponse gnar) {
+  private void checkAndAdjustPerTaskHeapSize(
+    final GetNewApplicationResponse gnar) {
     // do we have the right heap size on these cluster nodes to run our job?
-    final int minCapacity = gnar.getMinimumResourceCapability().getMemory();
+    //TODO:
+    //final int minCapacity = gnar.getMinimumResourceCapability().getMemory();
     final int maxCapacity = gnar.getMaximumResourceCapability().getMemory();
     // make sure heap size is OK for this cluster's available containers
     int giraphMem = giraphConf.getYarnTaskHeapMb();
@@ -227,11 +262,11 @@ public class GiraphYarnClient extends YarnClientImpl {
         "minimum; downgrading Giraph to" + maxCapacity + "MB.");
       giraphMem = maxCapacity;
     }
-    if (giraphMem < minCapacity) {
+    /*if (giraphMem < minCapacity) { //TODO:
       LOG.info("Giraph's request of heap MB per-task is less than the " +
         "minimum; upgrading Giraph to " + minCapacity + "MB.");
       giraphMem = minCapacity;
-    }
+    }*/
     giraphConf.setYarnTaskHeapMb(giraphMem); // record any changes made
   }
 
@@ -240,7 +275,7 @@ public class GiraphYarnClient extends YarnClientImpl {
    * just sleep and wait for the job to finish. If no AM response, kill the app.
    * @return true if job run is successful.
    */
-  private boolean awaitGiraphJobCompletion() {
+  private boolean awaitGiraphJobCompletion() throws YarnException, IOException {
     boolean done;
     ApplicationReport report = null;
     try {
@@ -250,7 +285,7 @@ public class GiraphYarnClient extends YarnClientImpl {
         } catch (InterruptedException ir) {
           LOG.info("Progress reporter's sleep was interrupted!", ir);
         }
-        report = super.getApplicationReport(appId);
+        report = yarnClient.getApplicationReport(appId);
         done = checkProgress(report);
       } while (!done);
       if (!giraphConf.metricsEnabled()) {
@@ -263,8 +298,8 @@ public class GiraphYarnClient extends YarnClientImpl {
         diagnostics, ex);
       try {
         LOG.error("FORCIBLY KILLING Application from AppMaster.");
-        super.killApplication(appId);
-      } catch (YarnRemoteException yre) {
+        yarnClient.killApplication(appId);
+      } catch (YarnException yre) {
         LOG.error("Exception raised in attempt to kill application.", yre);
       }
       return false;
@@ -291,10 +326,10 @@ public class GiraphYarnClient extends YarnClientImpl {
    * Print final formatted job report for local client that initiated this run.
    * @return true for app success, false for failure.
    */
-  private boolean printFinalJobReport() {
+  private boolean printFinalJobReport() throws YarnException, IOException {
     ApplicationReport report;
     try {
-      report = super.getApplicationReport(appId);
+      report = yarnClient.getApplicationReport(appId);
       FinalApplicationStatus finalAppStatus =
         report.getFinalApplicationStatus();
       final long secs =
@@ -303,7 +338,7 @@ public class GiraphYarnClient extends YarnClientImpl {
         secs / 60L, secs % 60L);
       LOG.info("Completed " + jobName + ": " +
         finalAppStatus.name() + ", total running time: " + time);
-    } catch (YarnRemoteException yre) {
+    } catch (YarnException yre) {
       LOG.error("Exception encountered while attempting to request " +
         "a final job report for " + jobName , yre);
       return false;
@@ -315,18 +350,50 @@ public class GiraphYarnClient extends YarnClientImpl {
    * Compose the ContainerLaunchContext for the Application Master.
    * @return the CLC object populated and configured.
    */
-  private ContainerLaunchContext buildContainerLaunchContext() {
+  private ContainerLaunchContext buildContainerLaunchContext()
+    throws IOException {
     ContainerLaunchContext appMasterContainer =
       Records.newRecord(ContainerLaunchContext.class);
     appMasterContainer.setEnvironment(buildEnvironment());
     appMasterContainer.setLocalResources(buildLocalResourceMap());
     appMasterContainer.setCommands(buildAppMasterExecCommand());
-    appMasterContainer.setResource(buildContainerMemory());
-    appMasterContainer.setUser(ApplicationConstants.Environment.USER.name());
+    //appMasterContainer.setResource(buildContainerMemory());
+    //appMasterContainer.setUser(ApplicationConstants.Environment.USER.name());
+    setToken(appMasterContainer);
     return appMasterContainer;
   }
 
   /**
+   * Set delegation tokens for AM container
+   * @param amContainer AM container
+   * @return
+   */
+  private void setToken(ContainerLaunchContext amContainer) throws IOException {
+    // Setup security tokens
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Credentials credentials = new Credentials();
+      String tokenRenewer = giraphConf.get(YarnConfiguration.RM_PRINCIPAL);
+      if (tokenRenewer == null || tokenRenewer.length() == 0) {
+        throw new IOException(
+          "Can't get Master Kerberos principal for the RM to use as renewer");
+      }
+      FileSystem fs = FileSystem.get(giraphConf);
+      // For now, only getting tokens for the default file-system.
+      final Token<?> [] tokens =
+        fs.addDelegationTokens(tokenRenewer, credentials);
+      if (tokens != null) {
+        for (Token<?> token : tokens) {
+          LOG.info("Got dt for " + fs.getUri() + "; " + token);
+        }
+      }
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+      ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+      amContainer.setTokens(fsTokens);
+    }
+  }
+
+  /**
    * Assess whether job is already finished/failed and 'done' flag needs to be
    * set, prints progress display for client if all is going well.
    * @param report the application report to assess.
@@ -376,6 +443,7 @@ public class GiraphYarnClient extends YarnClientImpl {
     return ImmutableList.of("${JAVA_HOME}/bin/java " +
       "-Xmx" + YARN_APP_MASTER_MEMORY_MB + "M " +
       "-Xms" + YARN_APP_MASTER_MEMORY_MB + "M " + // TODO: REMOVE examples jar!
+      //TODO: Make constant
       "-cp .:${CLASSPATH} org.apache.giraph.yarn.GiraphApplicationMaster " +
       "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stdout.log " +
       "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stderr.log "
@@ -403,6 +471,7 @@ public class GiraphYarnClient extends YarnClientImpl {
   private void addLocalJarsToResourceMap(Map<String, LocalResource> map)
     throws IOException {
     Set<String> jars = Sets.newHashSet();
+    LOG.info("LIB JARS :" + giraphConf.getYarnLibJars());
     String[] libJars = giraphConf.getYarnLibJars().split(",");
     for (String libJar : libJars) {
       jars.add(libJar);
@@ -410,8 +479,8 @@ public class GiraphYarnClient extends YarnClientImpl {
     FileSystem fs = FileSystem.get(giraphConf);
     Path baseDir = YarnUtils.getFsCachePath(fs, appId);
     for (Path jar : YarnUtils.getLocalFiles(jars)) {
-      LOG.info("Located local resource for export at: " + jar);
       Path dest = new Path(baseDir, jar.getName());
+      LOG.info("Made local resource for :" + jar + " to " +  dest);
       fs.copyFromLocalFile(false, true, jar, dest);
       YarnUtils.addFileToResourceMap(map, fs, dest);
     }
@@ -423,7 +492,7 @@ public class GiraphYarnClient extends YarnClientImpl {
    */
   private Resource buildContainerMemory() {
     Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(YARN_APP_MASTER_MEMORY_MB);
+    capability.setMemory(YARN_APP_MASTER_MEMORY_MB); //Configurable thru CLI?
     return capability;
   }
 
@@ -435,8 +504,10 @@ public class GiraphYarnClient extends YarnClientImpl {
   private Map<String, String> buildEnvironment() {
     Map<String, String> environment =
       Maps.<String, String>newHashMap();
+    LOG.info("Set the environment for the application master");
     YarnUtils.addLocalClasspathToEnv(environment, giraphConf);
-    // TODO: add java.class.path to env map if running a local YARN minicluster.
+    //TODO: add the runtime classpath needed for tests to work
+    LOG.info("Environment for AM :" + environment);
     return environment;
   }
 
@@ -444,33 +515,29 @@ public class GiraphYarnClient extends YarnClientImpl {
    * Create the mapping of files and JARs to send to the GiraphApplicationMaster
    * and from there on to the Giraph tasks.
    * @return the map of jars to local resource paths for transport
-   *         to the host container that will run our AppMaster.
+   *   to the host container that will run our AppMaster.
    */
   private Map<String, LocalResource> buildLocalResourceMap() {
+    // set local resources for the application master
+    // local files or archives as needed
+    // In this scenario, the jar file for the application master
+    //is part of the local resources
     Map<String, LocalResource> localResources =
         Maps.<String, LocalResource>newHashMap();
+    LOG.info("buildLocalResourceMap ....");
     try {
       // export the GiraphConfiguration to HDFS for localization to remote tasks
+      //Ques: Merge the following two method
       YarnUtils.exportGiraphConfiguration(giraphConf, appId);
       YarnUtils.addGiraphConfToLocalResourceMap(
         giraphConf, appId, localResources);
       // add jars from '-yj' cmd-line arg to resource map for localization
       addLocalJarsToResourceMap(localResources);
+      //TODO: log4j?
       return localResources;
     } catch (IOException ioe) {
       throw new IllegalStateException("Failed to build LocalResouce map.", ioe);
     }
   }
 
-  /**
-   * Create the app submission context, and populate it.
-   * @return the populated ApplicationSubmissionContext for the AppMaster.
-   */
-  private ApplicationSubmissionContext createAppSubmissionContext() {
-    ApplicationSubmissionContext appContext =
-      Records.newRecord(ApplicationSubmissionContext.class);
-    appContext.setApplicationId(appId);
-    appContext.setApplicationName(jobName);
-    return appContext;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3a20c559/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java b/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
index aa042e8..2e0602a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
@@ -68,7 +68,8 @@ public class YarnUtils {
     for (String fileName : giraphConf.getYarnLibJars().split(",")) {
       if (fileName.length() > 0) {
         Path filePath = new Path(baseDir, fileName);
-        LOG.info("Adding " + fileName + " to LocalResources for export.");
+        LOG.info("Adding " + fileName + " to LocalResources for export.to " +
+          filePath);
         if (fileName.contains("giraph-core")) {
           coreJarFound = true;
         }
@@ -97,6 +98,7 @@ public class YarnUtils {
     }
     classPath += System.getenv("CLASSPATH");
     for (String baseDir : classPath.split(":")) {
+      LOG.info("Class path name " + baseDir);
       if (baseDir.length() > 0) {
         // lose the globbing chars that will fail in File#listFiles
         final int lastFileSep = baseDir.lastIndexOf("/");
@@ -106,6 +108,7 @@ public class YarnUtils {
             baseDir = baseDir.substring(0, lastFileSep);
           }
         }
+        LOG.info("base path checking " + baseDir);
         populateJarList(new File(baseDir), jarPaths, fileNames);
       }
       if (jarPaths.size() >= fileNames.size()) {
@@ -153,7 +156,7 @@ public class YarnUtils {
     resource.setType(LocalResourceType.FILE); // use FILE, even for jars!
     resource.setVisibility(LocalResourceVisibility.APPLICATION);
     localResources.put(target.getName(), resource);
-    LOG.info("Registered file in LocalResources: " + target.getName());
+    LOG.info("Registered file in LocalResources :: " + target);
   }
 
   /**
@@ -180,7 +183,7 @@ public class YarnUtils {
     for (String cpEntry : giraphConf.getStrings(
       YarnConfiguration.YARN_APPLICATION_CLASSPATH,
       YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
-      classPathEnv.append(':').append(cpEntry.trim());
+      classPathEnv.append(':').append(cpEntry.trim()); //TODO: Separator
     }
     for (String cpEntry : giraphConf.getStrings(
       MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
@@ -220,7 +223,9 @@ public class YarnUtils {
     File confFile = new File(System.getProperty("java.io.tmpdir"),
       GiraphConstants.GIRAPH_YARN_CONF_FILE);
     if (confFile.exists()) {
-      confFile.delete();
+      if (!confFile.delete()) {
+        LOG.warn("Unable to delete file " + confFile);
+      }
     }
     String localConfPath = confFile.getAbsolutePath();
     FileOutputStream fos = null;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3a20c559/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 43804f6..cc2c9a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -868,78 +868,58 @@ under the License.
          All profiles below are munge-free: avoid introducing any munge
          flags on any of the following profiles. -->
     <profile>
-      <id>hadoop_2.0.0</id>
-      <properties>
-        <hadoop.version>2.0.0-alpha</hadoop.version>
-      </properties>
-      <dependencies>
-        <!-- sorted lexicographically -->
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-common</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <profile>
-      <id>hadoop_2.0.1</id>
-      <properties>
-        <hadoop.version>2.0.1-alpha</hadoop.version>
-      </properties>
-      <dependencies>
-        <!-- sorted lexicographically -->
+       <id>2.1.1-SNAPSHOT</id>
+       <properties>
+         <hadoop.version>2.1.1-SNAPSHOT</hadoop.version>
+       </properties>
+       <dependencies>
+         <!-- sorted lexicographically -->
         <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
+          <groupId>commons-configuration</groupId>
+          <artifactId>commons-configuration</artifactId>
         </dependency>
         <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-common</artifactId>
+          <groupId>commons-httpclient</groupId>
+          <artifactId>commons-httpclient</artifactId>
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <artifactId>hadoop-auth</artifactId>
         </dependency>
-      </dependencies>
-    </profile>
+         <dependency>
+           <groupId>org.apache.hadoop</groupId>
+           <artifactId>hadoop-common</artifactId>
+         </dependency>
+         <dependency>
+           <groupId>org.apache.hadoop</groupId>
+           <artifactId>hadoop-mapreduce-client-common</artifactId>
+         </dependency>
+         <dependency>
+           <groupId>org.apache.hadoop</groupId>
+           <artifactId>hadoop-mapreduce-client-core</artifactId>
+         </dependency>
+       </dependencies>
+     </profile>
 
     <profile>
-      <id>hadoop_2.0.2</id>
-      <properties>
-        <hadoop.version>2.0.2-alpha</hadoop.version>
-      </properties>
-      <dependencies>
-        <!-- sorted lexicographically -->
+       <id>2.2.0</id>
+       <properties>
+         <hadoop.version>2.2.0</hadoop.version>
+       </properties>
+       <dependencies>
+         <!-- sorted lexicographically -->
         <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
+          <groupId>commons-configuration</groupId>
+          <artifactId>commons-configuration</artifactId>
         </dependency>
         <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-common</artifactId>
+          <groupId>commons-httpclient</groupId>
+          <artifactId>commons-httpclient</artifactId>
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <artifactId>hadoop-auth</artifactId>
         </dependency>
-      </dependencies>
-    </profile>
-
-    <profile>
-       <id>hadoop_2.0.3</id>
-       <properties>
-         <hadoop.version>2.0.3-alpha</hadoop.version>
-       </properties>
-       <dependencies>
-         <!-- sorted lexicographically -->
          <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
@@ -956,7 +936,7 @@ under the License.
      </profile>
 
     <profile>
-      <id>hadoop_trunk</id>
+      <id>hadoop_snapshot</id>
       <properties>
         <hadoop.version>3.0.0-SNAPSHOT</hadoop.version>
       </properties>


Mime
View raw message