hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1683197 - in /hama/trunk: ./ yarn/src/main/java/org/apache/hama/bsp/
Date Tue, 02 Jun 2015 23:26:58 GMT
Author: edwardyoon
Date: Tue Jun  2 23:26:58 2015
New Revision: 1683197

URL: http://svn.apache.org/r1683197
Log:
HAMA-939: Refactoring which was implement using out-of-date status response

Added:
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
Removed:
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1683197&r1=1683196&r2=1683197&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Jun  2 23:26:58 2015
@@ -26,6 +26,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
   
+   HAMA-939: Refactoring which was implement using out-of-date status response (Minho Kim
via edwardyoon)
    HAMA-955: Support UnsafeByteArrayInputStream and UnSafeByteArrayOutputStream (Minho Kim
via edwardyoon)
    HAMA-944: Add JSON format option to fastgen command (Minho Kim via edwardyoon)
    HAMA-919: Manage messages per Vertex (edwardyoon)

Added: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java?rev=1683197&view=auto
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java (added)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java Tue Jun  2 23:26:58
2015
@@ -0,0 +1,1011 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncServer;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.ipc.RPC;
+import org.apache.hama.ipc.Server;
+import org.apache.hama.util.BSPNetUtils;
+import org.apache.log4j.LogManager;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ApplicationMaster  implements BSPClient, BSPPeerProtocol {
+  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+
+  // Configuration
+  private Configuration localConf;
+  private Configuration jobConf;
+
+  private String jobFile;
+  private String applicationName;
+  // RPC info where the AM receive client side requests
+  private String hostname;
+  private int clientPort;
+  private FileSystem fs;
+  private static int id = 0;
+
+  private volatile long superstep;
+  private Counters globalCounter = new Counters();
+  private BSPJobClient.RawSplit[] splits;
+
+  private BSPJobID jobId;
+
+  // SyncServer for Zookeeper
+  private SyncServer syncServer;
+
+  // Zookeeper thread pool
+  private static final ExecutorService threadPool = Executors
+      .newFixedThreadPool(1);
+
+  // RPC info where the AM receive client side requests
+  private int taskServerPort;
+
+  private Server clientServer;
+  private Server taskServer;
+
+  // Handle to communicate with the Resource Manager
+  @SuppressWarnings("rawtypes")
+  private AMRMClientAsync amRMClient;
+
+  // In both secure and non-secure modes, this points to the job-submitter.
+  @VisibleForTesting
+  UserGroupInformation appSubmitterUgi;
+
+  // Handle to communicate with the Node Manager
+  private NMClientAsync nmClientAsync;
+  // Listen to process the response from the Node Manager
+  private NMCallbackHandler containerListener;
+
+  // Application Attempt Id ( combination of attemptId and fail count )
+  @VisibleForTesting
+  protected ApplicationAttemptId appAttemptID;
+
+
+  // TODO
+  // For status update for clients - yet to be implemented
+  // Hostname of the container
+  private String appMasterHostname = "";
+  // Port on which the app master listens for status updates from clients
+  private int appMasterRpcPort = -1;
+  // Tracking url to which app master publishes info for clients to monitor
+  private String appMasterTrackingUrl = "";
+
+  // App Master configuration
+  // No. of containers to run shell command on
+  @VisibleForTesting
+  protected int numTotalContainers;
+  // Memory to request for the container on which the shell command will run
+  private int containerMemory;
+  // VirtualCores to request for the container on which the shell command will run
+  private int containerVirtualCores = 1;
+
+  // Priority of the request
+  private int requestPriority = 0;
+
+  // Counter for completed containers ( complete denotes successful or failed )
+  private AtomicInteger numCompletedContainers = new AtomicInteger();
+  // Allocated container count so that we know how many containers has the RM
+  // allocated to us
+  @VisibleForTesting
+  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
+  // Count of failed containers
+  private AtomicInteger numFailedContainers = new AtomicInteger();
+  // Count of containers already requested from the RM
+  // Needed as once requested, we should not request for containers again.
+  // Only request for more if the original requirement changes.
+  @VisibleForTesting
+  protected AtomicInteger numRequestedContainers = new AtomicInteger();
+
+  private volatile boolean done;
+  private ByteBuffer allTokens;
+
+  // Launch threads
+  private List<Thread> launchThreads = new ArrayList<Thread>();
+
+  @VisibleForTesting
+  protected final Set<ContainerId> launchedContainers =
+      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+
+  public ApplicationMaster() {
+    // Set up the configuration
+    this.localConf = new YarnConfiguration();
+  }
+
+  public static void main(String[] args) throws IOException {
+    boolean result = false;
+    ApplicationMaster appMaster = new ApplicationMaster();
+    
+    try {
+      LOG.info("Initializing ApplicationMaster");
+      boolean doRun = appMaster.init(args);
+      if (!doRun) {
+        System.exit(0);
+      }
+      appMaster.run();
+      result = appMaster.finish();
+    } catch (Throwable t) {
+      LOG.fatal("Error running ApplicationMaster", t);
+      LogManager.shutdown();
+      ExitUtil.terminate(1, t);
+    } finally {
+      appMaster.close();
+    }
+    
+    if (result) {
+      LOG.info("Application Master completed successfully. exiting");
+      System.exit(0);
+    } else {
+      LOG.info("Application Master failed. exiting");
+      System.exit(2);
+    }
+  }
+
+  public boolean init(String[] args) throws Exception {
+    if (args.length != 1) {
+      throw new IllegalArgumentException();
+    }
+    this.jobFile = args[0];
+    this.jobConf = getSubmitConfiguration(jobFile);
+    localConf.addResource(localConf);
+    fs = FileSystem.get(jobConf);
+
+    this.applicationName = jobConf.get("bsp.job.name",
+        "<no bsp job name defined>");
+    if (applicationName.isEmpty()) {
+      this.applicationName = "<no bsp job name defined>";
+    }
+
+    appAttemptID = getApplicationAttemptId();
+    this.jobId = new BSPJobID(appAttemptID.toString(), 0);
+    this.appMasterHostname = BSPNetUtils.getCanonicalHostname();
+    this.appMasterTrackingUrl = "http://localhost:8088";
+    this.numTotalContainers = this.jobConf.getInt("bsp.peers.num", 1);
+    this.containerMemory = getMemoryRequirements(jobConf);
+
+    this.hostname = BSPNetUtils.getCanonicalHostname();
+    this.clientPort = BSPNetUtils.getFreePort(12000);
+
+    // Set configuration for starting SyncServer which run Zookeeper
+    this.jobConf.set(Constants.ZOOKEEPER_QUORUM, appMasterHostname);
+
+    // start our synchronization service
+    startSyncServer();
+
+    // start RPC server
+    startRPCServers();
+
+    /*
+     * Make sure that this executes after the start the RPC servers, because we
+     * are readjusting the configuration.
+     */
+    rewriteSubmitConfiguration(jobFile, jobConf);
+
+    String jobSplit = jobConf.get("bsp.job.split.file");
+    splits = null;
+    if (jobSplit != null) {
+      DataInputStream splitFile = fs.open(new Path(jobSplit));
+      try {
+        splits = BSPJobClient.readSplitFile(splitFile);
+      } finally {
+        splitFile.close();
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Main run function for the application master
+   *
+   * @throws org.apache.hadoop.yarn.exceptions.YarnException
+   * @throws IOException
+   */
+  @SuppressWarnings({ "unchecked" })
+  public void run() throws YarnException, IOException, InterruptedException {
+    LOG.info("Starting ApplicationMaster");
+
+    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
+    // are marked as LimitedPrivate
+    Credentials credentials =
+        UserGroupInformation.getCurrentUser().getCredentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    // Now remove the AM->RM token so that containers cannot access it.
+    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+    LOG.info("Executing with tokens:");
+    while (iter.hasNext()) {
+      Token<?> token = iter.next();
+      LOG.info(token);
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove();
+      }
+    }
+    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    // Create appSubmitterUgi and add original tokens to it
+    String appSubmitterUserName =
+        System.getenv(ApplicationConstants.Environment.USER.name());
+    appSubmitterUgi =
+        UserGroupInformation.createRemoteUser(appSubmitterUserName);
+    appSubmitterUgi.addCredentials(credentials);
+
+
+    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+    amRMClient.init(localConf);
+    amRMClient.start();
+
+    containerListener = createNMCallbackHandler();
+    nmClientAsync = new NMClientAsyncImpl(containerListener);
+    nmClientAsync.init(localConf);
+    nmClientAsync.start();
+
+    // Setup local RPC Server to accept status requests directly from clients
+    // TODO need to setup a protocol for client to be able to communicate to
+    // the RPC server
+    // TODO use the rpc port info to register with the RM for the client to
+    // send requests to this app master
+
+    // Register self with ResourceManager
+    // This will start heartbeating to the RM
+    appMasterHostname = NetUtils.getHostname();
+    RegisterApplicationMasterResponse response = amRMClient
+        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+            appMasterTrackingUrl);
+    // Dump out information about cluster capability as seen by the
+    // resource manager
+    int maxMem = response.getMaximumResourceCapability().getMemory();
+    LOG.info("Max mem capability of resources in this cluster " + maxMem);
+
+    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
+    LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
+
+    // A resource ask cannot exceed the max.
+    if (containerMemory > maxMem) {
+      LOG.info("Container memory specified above max threshold of cluster."
+          + " Using max value." + ", specified=" + containerMemory + ", max="
+          + maxMem);
+      containerMemory = maxMem;
+    }
+
+    if (containerVirtualCores > maxVCores) {
+      LOG.info("Container virtual cores specified above max threshold of cluster."
+          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
+          + maxVCores);
+      containerVirtualCores = maxVCores;
+    }
+
+    List<Container> previousAMRunningContainers =
+        response.getContainersFromPreviousAttempts();
+    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
+        + " previous attempts' running containers on AM registration.");
+    for(Container container: previousAMRunningContainers) {
+      launchedContainers.add(container.getId());
+    }
+    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
+
+
+    int numTotalContainersToRequest =
+        numTotalContainers - previousAMRunningContainers.size();
+    // Setup ask for containers from RM
+    // Send request for containers to RM
+    // Until we get our fully allocated quota, we keep on polling RM for
+    // containers
+    // Keep looping until all the containers are launched and shell script
+    // executed on them ( regardless of success/failure).
+    for (int i = 0; i < numTotalContainersToRequest; ++i) {
+      AMRMClient.ContainerRequest containerAsk = setupContainerAskForRM();
+      amRMClient.addContainerRequest(containerAsk);
+    }
+    numRequestedContainers.set(numTotalContainers);
+  }
+
+  @VisibleForTesting
+  NMCallbackHandler createNMCallbackHandler() {
+    return new NMCallbackHandler(this);
+  }
+
+  @VisibleForTesting
+  protected boolean finish() {
+    // wait for completion.
+    while (!done
+        && (numCompletedContainers.get() != numTotalContainers)) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException ex) {}
+    }
+
+    // Join all launched threads
+    // needed for when we time out
+    // and we need to release containers
+    for (Thread launchThread : launchThreads) {
+      try {
+        launchThread.join(10000);
+      } catch (InterruptedException e) {
+        LOG.info("Exception thrown in thread join: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+
+    // When the application completes, it should stop all running containers
+    LOG.info("Application completed. Stopping running containers");
+    nmClientAsync.stop();
+
+    // When the application completes, it should send a finish application
+    // signal to the RM
+    LOG.info("Application completed. Signalling finish to RM");
+
+    FinalApplicationStatus appStatus;
+    String appMessage = null;
+    boolean success = true;
+    if (numFailedContainers.get() == 0 &&
+        numCompletedContainers.get() == numTotalContainers) {
+      appStatus = FinalApplicationStatus.SUCCEEDED;
+    } else {
+      appStatus = FinalApplicationStatus.FAILED;
+      appMessage = "Diagnostics." + ", total=" + numTotalContainers
+          + ", completed=" + numCompletedContainers.get() + ", allocated="
+          + numAllocatedContainers.get() + ", failed="
+          + numFailedContainers.get();
+      LOG.info(appMessage);
+      success = false;
+    }
+    try {
+      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+    } catch (YarnException ex) {
+      LOG.error("Failed to unregister application", ex);
+    } catch (IOException e) {
+      LOG.error("Failed to unregister application", e);
+    }
+
+    amRMClient.stop();
+
+    return success;
+  }
+
+  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+      LOG.info("Got response from RM for container ask, completedCnt="
+          + completedContainers.size());
+      for (ContainerStatus containerStatus : completedContainers) {
+        LOG.info(appAttemptID + " got container status for containerID="
+            + containerStatus.getContainerId() + ", state="
+            + containerStatus.getState() + ", exitStatus="
+            + containerStatus.getExitStatus() + ", diagnostics="
+            + containerStatus.getDiagnostics());
+
+        // non complete containers should not be here
+        assert (containerStatus.getState() == ContainerState.COMPLETE);
+        // ignore containers we know nothing about - probably from a previous
+        // attempt
+        if (!launchedContainers.contains(containerStatus.getContainerId())) {
+          LOG.info("Ignoring completed status of "
+              + containerStatus.getContainerId()
+              + "; unknown container(probably launched by previous attempt)");
+          continue;
+        }
+
+        // increment counters for completed/failed containers
+        int exitStatus = containerStatus.getExitStatus();
+        if (0 != exitStatus) {
+          // container failed
+          if (ContainerExitStatus.ABORTED != exitStatus) {
+            // shell script failed
+            // counts as completed
+            numCompletedContainers.incrementAndGet();
+            numFailedContainers.incrementAndGet();
+          } else {
+            // container was killed by framework, possibly preempted
+            // we should re-try as the container was lost for some reason
+            numAllocatedContainers.decrementAndGet();
+            numRequestedContainers.decrementAndGet();
+            // we do not need to release the container as it would be done
+            // by the RM
+          }
+        } else {
+          // nothing to do
+          // container completed successfully
+          numCompletedContainers.incrementAndGet();
+          LOG.info("Container completed successfully." + ", containerId="
+              + containerStatus.getContainerId());
+        }
+      }
+
+      // ask for more containers if any failed
+      int askCount = numTotalContainers - numRequestedContainers.get();
+      numRequestedContainers.addAndGet(askCount);
+
+      if (askCount > 0) {
+        for (int i = 0; i < askCount; ++i) {
+          AMRMClient.ContainerRequest containerAsk = setupContainerAskForRM();
+          amRMClient.addContainerRequest(containerAsk);
+        }
+      }
+
+      if (numCompletedContainers.get() == numTotalContainers) {
+        done = true;
+      }
+    }
+
+    @Override
+    public void onContainersAllocated(List<Container> allocatedContainers) {
+      LOG.info("Got response from RM for container ask, allocatedCnt="
+          + allocatedContainers.size());
+      numAllocatedContainers.addAndGet(allocatedContainers.size());
+      for (Container allocatedContainer : allocatedContainers) {
+        LOG.info("Launching shell command on a new container."
+            + ", containerId=" + allocatedContainer.getId()
+            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+            + ":" + allocatedContainer.getNodeId().getPort()
+            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+            + ", containerResourceMemory"
+            + allocatedContainer.getResource().getMemory()
+            + ", containerResourceVirtualCores"
+            + allocatedContainer.getResource().getVirtualCores());
+        // + ", containerToken"
+        // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+        Thread launchThread = createLaunchContainerThread(allocatedContainer);
+
+        // launch and start the container on a separate thread to keep
+        // the main thread unblocked
+        // as all containers may not be allocated at one go.
+        launchThreads.add(launchThread);
+        launchedContainers.add(allocatedContainer.getId());
+        launchThread.start();
+        id++;
+      }
+    }
+
+    @Override
+    public void onShutdownRequest() {
+      done = true;
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> list) {
+
+    }
+
+    @Override
+    public float getProgress() {
+      // set progress to deliver to RM on next heartbeat
+      float progress = (float) numCompletedContainers.get()
+          / numTotalContainers;
+      return progress;
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+      done = true;
+      amRMClient.stop();
+    }
+  }
+
+  @VisibleForTesting
+  static class NMCallbackHandler
+      implements NMClientAsync.CallbackHandler {
+
+    private ConcurrentMap<ContainerId, Container> containers =
+        new ConcurrentHashMap<ContainerId, Container>();
+    private final ApplicationMaster applicationMaster;
+
+    public NMCallbackHandler(ApplicationMaster applicationMaster) {
+      this.applicationMaster = applicationMaster;
+    }
+
+    public void addContainer(ContainerId containerId, Container container) {
+      containers.putIfAbsent(containerId, container);
+    }
+
+    @Override
+    public void onContainerStopped(ContainerId containerId) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Succeeded to stop Container " + containerId);
+      }
+      containers.remove(containerId);
+    }
+
+    @Override
+    public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Container Status: id=" + containerId + ", status=" +
+            containerStatus);
+      }
+    }
+
+    @Override
+    public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Succeeded to start Container " + containerId);
+      }
+      Container container = containers.get(containerId);
+      if (container != null) {
+        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+      }
+    }
+
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      LOG.error("Failed to start Container " + containerId);
+      containers.remove(containerId);
+      applicationMaster.numCompletedContainers.incrementAndGet();
+      applicationMaster.numFailedContainers.incrementAndGet();
+    }
+
+    @Override
+    public void onGetContainerStatusError(
+        ContainerId containerId, Throwable t) {
+      LOG.error("Failed to query the status of Container " + containerId);
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+      LOG.error("Failed to stop Container " + containerId);
+      containers.remove(containerId);
+    }
+  }
+
+  /**
+   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+   * that will execute the shell command.
+   */
+  private class LaunchContainerRunnable implements Runnable {
+
+    // Allocated container
+    Container container;
+
+    NMCallbackHandler containerListener;
+
+    Configuration conf;
+
+    /**
+     * @param lcontainer        Allocated container
+     * @param containerListener Callback handler of the container
+     */
+    public LaunchContainerRunnable(
+        Container lcontainer, NMCallbackHandler containerListener, Configuration conf) {
+      this.container = lcontainer;
+      this.containerListener = containerListener;
+      this.conf = conf;
+    }
+
+    /**
+     * Connects to CM, sets up container launch context
+     * for shell command and eventually dispatches the container
+     * start request to the CM.
+     */
+    @Override
+    public void run() {
+      LOG.info("Setting up container launch container for containerid="
+          + container.getId());
+      // Now we setup a ContainerLaunchContext
+      ContainerLaunchContext ctx = Records
+          .newRecord(ContainerLaunchContext.class);
+
+      // Set the local resources
+      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+      LocalResource packageResource = Records.newRecord(LocalResource.class);
+      FileSystem fs = null;
+      try {
+        fs = FileSystem.get(conf);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
+      URL packageUrl = null;
+      try {
+        packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
+          .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+        LOG.info("PackageURL has been composed to " + packageUrl.toString());
+        LOG.info("Reverting packageURL to path: "
+            + ConverterUtils.getPathFromYarnURL(packageUrl));
+      } catch (URISyntaxException e) {
+        LOG.fatal("If you see this error the workarround does not work", e);
+        numCompletedContainers.incrementAndGet();
+        numFailedContainers.incrementAndGet();
+        return;
+      }
+
+      packageResource.setResource(packageUrl);
+      packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
+      packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
+      packageResource.setType(LocalResourceType.FILE);
+      packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+      localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
+
+      Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
+      URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
+          .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+      LOG.info("Hama release URL has been composed to " + hamaReleaseUrl
+          .toString());
+
+      RemoteIterator<LocatedFileStatus> fileStatusListIterator = null;
+      try {
+        fileStatusListIterator = fs.listFiles(
+            hamaReleaseFile, true);
+
+        while(fileStatusListIterator.hasNext()) {
+          LocatedFileStatus lfs = fileStatusListIterator.next();
+          LocalResource localRsrc = LocalResource.newInstance(
+              ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+              lfs.getLen(), lfs.getModificationTime());
+          localResources.put(lfs.getPath().getName(), localRsrc);
+        }
+      } catch (IOException e) {
+        LOG.fatal("The error has occured to RemoteIterator  " + e);
+      }
+
+      ctx.setLocalResources(localResources);
+
+    /*
+     * TODO Package classpath seems not to work if you're in pseudo distributed
+     * mode, because the resource must not be moved, it will never be unpacked.
+     * So we will check if our jar file has the file:// prefix and put it into
+     * the CP directly
+     */
+
+      StringBuilder classPathEnv = new StringBuilder(
+          ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+          .append("./*");
+      for (String c : conf.getStrings(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+        classPathEnv.append(File.pathSeparatorChar);
+        classPathEnv.append(c.trim());
+      }
+
+      Vector<CharSequence> vargs = new Vector<CharSequence>();
+      vargs.add("${JAVA_HOME}/bin/java");
+      vargs.add("-cp " + classPathEnv + "");
+      vargs.add(BSPRunner.class.getCanonicalName());
+
+      vargs.add(jobId.getJtIdentifier());
+      vargs.add(Integer.toString(id));
+      vargs.add(
+          new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory())
+              .toString());
+
+      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stdout");
+      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stderr");
+
+      // Get final commmand
+      StringBuilder command = new StringBuilder();
+      for (CharSequence str : vargs) {
+        command.append(str).append(" ");
+      }
+
+      List<String> commands = new ArrayList<String>();
+      commands.add(command.toString());
+
+      ctx.setCommands(commands);
+      ctx.setTokens(allTokens.duplicate());
+      LOG.info("Starting commands: " + commands);
+
+      containerListener.addContainer(container.getId(), container);
+      nmClientAsync.startContainerAsync(container, ctx);
+    }
+  }
+
+  /**
+   * Setup the request that will be sent to the RM for the container ask.
+   *
+   * @return the setup ResourceRequest to be sent to RM
+   */
+  private AMRMClient.ContainerRequest setupContainerAskForRM() {
+    // setup requirements for hosts
+    // using * as any host will do for the distributed shell app
+    // set the priority for the request
+    // TODO - what is the range for priority? how to decide?
+    Priority pri = Priority.newInstance(requestPriority);
+
+    // Set up resource type requirements
+    // For now, memory and CPU are supported so we set memory and cpu requirements
+    Resource capability = Resource.newInstance(containerMemory,
+        containerVirtualCores);
+
+    AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, null,
null,
+        pri);
+    LOG.info("Requested container ask: " + request.toString());
+    return request;
+  }
+
+  /**
+   * Reads the configuration from the given path.
+   */
+  private static Configuration getSubmitConfiguration(String path)
+      throws IOException {
+    Path jobSubmitPath = new Path(path);
+    Configuration jobConf = new HamaConfiguration();
+
+    FileSystem fs = FileSystem.get(URI.create(path), jobConf);
+
+    InputStream in =fs.open(jobSubmitPath);
+    jobConf.addResource(in);
+
+    return jobConf;
+  }
+
+  /**
+   * Gets the application attempt ID from the environment. This should be set by
+   * YARN when the container has been launched.
+   *
+   * @return a new ApplicationAttemptId which is unique and identifies this
+   *         task.
+   */
+  private static ApplicationAttemptId getApplicationAttemptId()
+      throws IOException {
+    Map<String, String> envs = System.getenv();
+    if (!envs.containsKey(ApplicationConstants.Environment.CONTAINER_ID.name())) {
+      throw new IllegalArgumentException(
+          "ApplicationAttemptId not set in the environment");
+    }
+
+    LOG.info("app attempt id!!!");
+    ContainerId containerId = ConverterUtils.toContainerId(envs
+        .get(ApplicationConstants.Environment.CONTAINER_ID.name()));
+    return containerId.getApplicationAttemptId();
+  }
+
+  /**
+   * This method starts the sync server on a specific port and waits for it to
+   * come up. Be aware that this method adds the "bsp.sync.server.address" that
+   * is needed for a task to connect to the service.
+   *
+   * @throws IOException
+   */
+  private void startSyncServer() throws Exception {
+    syncServer = SyncServiceFactory.getSyncServer(jobConf);
+    syncServer.init(jobConf);
+
+    ZKServerThread serverThread = new ZKServerThread(syncServer);
+    threadPool.submit(serverThread);
+  }
+
+  /**
+   * This method is to run Zookeeper in order to coordinates between BSPMaster and Groomservers
+   * using Runnable interface in java.
+   */
+  private static class ZKServerThread implements Runnable {
+    SyncServer server;
+
+    ZKServerThread(SyncServer s) {
+      server = s;
+    }
+
+    @Override
+    public void run() {
+      try {
+        server.start();
+      } catch (Exception e) {
+        LOG.error("Error running SyncServer.", e);
+      }
+    }
+  }
+
+  /**
+   * This method starts the needed RPC servers: client server and the task
+   * server. This method manipulates the configuration and therefore needs to be
+   * executed BEFORE the submitconfiguration gets rewritten.
+   *
+   * @throws IOException
+   */
+  private void startRPCServers() throws IOException {
+    // start the RPC server which talks to the client
+    this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf);
+    this.clientServer.start();
+
+    // start the RPC server which talks to the tasks
+    this.taskServerPort = BSPNetUtils.getFreePort(10000);
+    this.taskServer = RPC.getServer(this, hostname, taskServerPort, jobConf);
+    this.taskServer.start();
+
+    // readjusting the configuration to let the tasks know where we are.
+    this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
+  }
+
+  /**
+   * Writes the current configuration to a given path to reflect changes. For
+   * example the sync server address is put after the file has been written.
+   */
+  private static void rewriteSubmitConfiguration(String path, Configuration conf)
+      throws IOException {
+    Path jobSubmitPath = new Path(path);
+    FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream out = fs.create(jobSubmitPath);
+    conf.writeXml(out);
+    out.close();
+
+    LOG.info("Written new configuration back to " + path);
+  }
+
+  /**
+   * Get container memory from "bsp.child.mem.in.mb" set on Hama configuration
+   * @return The memory of container.
+   */
+  private int getMemoryRequirements(Configuration conf) {
+    String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
+    if (newMemoryProperty == null) {
+      LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
+      return getMemoryFromOptString(conf.get("bsp.child.java.opts"));
+    } else {
+      return Integer.valueOf(newMemoryProperty);
+    }
+  }
+
+  // This really needs a testcase
+  private static int getMemoryFromOptString(String opts) {
+    final int DEFAULT_MEMORY_MB = 256;
+
+    if (opts == null) {
+      return DEFAULT_MEMORY_MB;
+    }
+
+    if (!opts.contains("-Xmx")) {
+      LOG.info(
+          "No \"-Xmx\" option found in child opts, using default amount of memory!");
+      return DEFAULT_MEMORY_MB;
+    } else {
+      // e.G: -Xmx512m
+
+      int startIndex = opts.indexOf("-Xmx") + 4;
+      String xmxString = opts.substring(startIndex);
+      char qualifier = xmxString.charAt(xmxString.length() - 1);
+      int memory = Integer
+          .valueOf(xmxString.substring(0, xmxString.length() - 1));
+      if (qualifier == 'm') {
+        return memory;
+      } else if (qualifier == 'g') {
+        return memory * 1024;
+      } else {
+        throw new IllegalArgumentException(
+            "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was:
"
+                + opts);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  Thread createLaunchContainerThread(Container allocatedContainer) {
+    LaunchContainerRunnable runnableLaunchContainer =
+        new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf);
+    return new Thread(runnableLaunchContainer);
+  }
+
+  @Override
+  public LongWritable getCurrentSuperStep() {
+    return new LongWritable(superstep);
+  }
+
+  @Override
+  public Task getTask(TaskAttemptID taskid) throws IOException {
+    BSPJobClient.RawSplit assignedSplit = null;
+    String splitName = NullInputFormat.NullInputSplit.class.getName();
+    //String splitName = NullInputSplit.class.getCanonicalName();
+    if (splits != null) {
+      assignedSplit = splits[taskid.id];
+      splitName = assignedSplit.getClassName();
+      return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
+          assignedSplit.getBytes());
+    } else {
+      return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
+          new BytesWritable());
+    }
+  }
+
+  @Override
+  public boolean ping(TaskAttemptID taskid) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void done(TaskAttemptID taskid) throws IOException {
+
+  }
+
+  @Override
+  public void fsError(TaskAttemptID taskId, String message) throws IOException {
+
+  }
+
+  @Override
+  public void fatalError(TaskAttemptID taskId, String message)
+      throws IOException {
+
+  }
+
+  @Override
+  public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+      throws IOException, InterruptedException {
+    if (taskStatus.getSuperstepCount() > superstep) {
+      superstep = taskStatus.getSuperstepCount();
+      LOG.info("Now in superstep " + superstep);
+    }
+
+    Counters counters = taskStatus.getCounters();
+    globalCounter.incrAllCounters(counters);
+
+    return true;
+  }
+
+  @Override
+  public int getAssignedPortNum(TaskAttemptID taskid) {
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.clientServer.stop();
+    this.taskServer.stop();
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return BSPClient.versionID;
+  }
+}

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java?rev=1683197&r1=1683196&r2=1683197&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java Tue Jun  2 23:26:58
2015
@@ -33,10 +33,9 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -131,7 +130,6 @@ public class YARNBSPJobClient extends BS
     yarnConf = new YarnConfiguration(conf);
     yarnClient = YarnClient.createYarnClient();
     yarnClient.init(yarnConf);
-    yarnClient.start();
   }
 
   @Override
@@ -155,6 +153,7 @@ public class YARNBSPJobClient extends BS
       LOG.debug("Retrieved username: " + s);
     }
 
+    yarnClient.start();
     try {
       YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
       LOG.info("Got Cluster metric info from ASM"
@@ -188,14 +187,16 @@ public class YARNBSPJobClient extends BS
         }
       }
 
-      GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
-      GetNewApplicationResponse response = job.getApplicationsManager().getNewApplication(request);
-      id = response.getApplicationId();
+      // Get a new application id
+      YarnClientApplication app = yarnClient.createApplication();
+
 
       // Create a new ApplicationSubmissionContext
-      ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
-      // set the ApplicationId
-      appContext.setApplicationId(this.id);
+      //ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+      ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+
+      id = appContext.getApplicationId();
+
       // set the application name
       appContext.setApplicationName(job.getJobName());
 
@@ -227,7 +228,11 @@ public class YARNBSPJobClient extends BS
       localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc);
 
       // add hama related jar files to localresources for container
-      List<File> hamaJars = localJarfromPath(System.getProperty("hama.home.dir"));
+      List<File> hamaJars;
+      if (System.getProperty("hama.home.dir") != null)
+        hamaJars = localJarfromPath(System.getProperty("hama.home.dir"));
+      else
+        hamaJars = localJarfromPath(getConf().get("hama.home.dir"));
       String hamaPath = getSystemDir() + "/hama";
       for (File fileEntry : hamaJars) {
         addToLocalResources(fs, fileEntry.getCanonicalPath(),
@@ -266,7 +271,7 @@ public class YARNBSPJobClient extends BS
       Vector<CharSequence> vargs = new Vector<CharSequence>(5);
       vargs.add("${JAVA_HOME}/bin/java");
       vargs.add("-cp " + classPathEnv + "");
-      vargs.add(BSPApplicationMaster.class.getCanonicalName());
+      vargs.add(ApplicationMaster.class.getCanonicalName());
       vargs.add(submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
 
       vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-appmaster.stdout");

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1683197&r1=1683196&r2=1683197&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java Tue Jun 
2 23:26:58 2015
@@ -74,12 +74,13 @@ public class YarnSerializePrinting {
       }
     }
 
-    //fs.delete(OUTPUT_PATH, true);
+    fs.delete(OUTPUT_PATH, true);
   }
 
   public static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException {
     HamaConfiguration conf = new HamaConfiguration();
+    conf.set("hama.home.dir", System.getenv().get("HAMA_HOME"));
 
     YARNBSPJob job = new YARNBSPJob(conf);
     job.setBspClass(HelloBSP.class);



Mime
View raw message