hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1684349 - in /hama/trunk: conf/log4j.properties core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
Date Tue, 09 Jun 2015 08:27:29 GMT
Author: edwardyoon
Date: Tue Jun  9 08:27:29 2015
New Revision: 1684349

URL: http://svn.apache.org/r1684349
Log:
HAMA-939: Refactoring which was implement using out-of-date status response

Modified:
    hama/trunk/conf/log4j.properties
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java

Modified: hama/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hama/trunk/conf/log4j.properties?rev=1684349&r1=1684348&r2=1684349&view=diff
==============================================================================
--- hama/trunk/conf/log4j.properties (original)
+++ hama/trunk/conf/log4j.properties Tue Jun  9 08:27:29 2015
@@ -83,4 +83,3 @@ log4j.appender.console.layout.Conversion
 #log4j.logger.org.apache.hadoop.dfs=DEBUG
 #log4j.logger.org.apache.hama=DEBUG
 #log4j.logger.org.apache.zookeeper=DEBUG
-#log4j.logger.org.apache.avro=DEBUG

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1684349&r1=1684348&r2=1684349&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java Tue
Jun  9 08:27:29 2015
@@ -237,16 +237,26 @@ public class ZooKeeperSyncClientImpl ext
   @Override
   public void register(BSPJobID jobId, TaskAttemptID taskId,
       String hostAddress, long port) {
-    try {
-      String jobRegisterKey = constructKey(jobId, "peers");
-      if (zk.exists(jobRegisterKey, false) == null) {
+    int count = 0;
+    String jobRegisterKey = constructKey(jobId, "peers");
+    Stat stat = null;
+
+    LOG.info("TaskAttemptID : " + taskId);
+    while (stat != null) {
+      try {
+        stat = zk.exists(jobRegisterKey, false);
         zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE,
             CreateMode.PERSISTENT);
+        Thread.sleep(1000);
+      } catch (Exception e) {
+        LOG.debug(e); // ignore it.
+      }
+      count++;
+
+      // retry 10 times.
+      if (count > 9) {
+        throw new RuntimeException("can't create root node.");
       }
-    } catch (KeeperException e) {
-      LOG.error(e);
-    } catch (InterruptedException e) {
-      LOG.error(e);
     }
     registerTask(jobId, hostAddress, port, taskId);
   }

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java?rev=1684349&r1=1684348&r2=1684349&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java Tue Jun  9 08:27:29
2015
@@ -80,7 +80,6 @@ public class ApplicationMaster  implemen
   private String hostname;
   private int clientPort;
   private FileSystem fs;
-  private static int id = 0;
 
   private volatile long superstep;
   private Counters globalCounter = new Counters();
@@ -186,6 +185,7 @@ public class ApplicationMaster  implemen
       LogManager.shutdown();
       ExitUtil.terminate(1, t);
     } finally {
+      LOG.info("Stop SyncServer and RPCServer.");
       appMaster.close();
     }
     
@@ -491,6 +491,7 @@ public class ApplicationMaster  implemen
     public void onContainersAllocated(List<Container> allocatedContainers) {
       LOG.info("Got response from RM for container ask, allocatedCnt="
           + allocatedContainers.size());
+
       numAllocatedContainers.addAndGet(allocatedContainers.size());
       for (Container allocatedContainer : allocatedContainers) {
         LOG.info("Launching shell command on a new container."
@@ -502,10 +503,8 @@ public class ApplicationMaster  implemen
             + allocatedContainer.getResource().getMemory()
             + ", containerResourceVirtualCores"
             + allocatedContainer.getResource().getVirtualCores());
-        // + ", containerToken"
-        // +allocatedContainer.getContainerToken().getIdentifier().toString());
 
-        Thread launchThread = createLaunchContainerThread(allocatedContainer);
+        Thread launchThread = createLaunchContainerThread(allocatedContainer, allocatedContainer.getId().getContainerId());
 
         // launch and start the container on a separate thread to keep
         // the main thread unblocked
@@ -513,7 +512,6 @@ public class ApplicationMaster  implemen
         launchThreads.add(launchThread);
         launchedContainers.add(allocatedContainer.getId());
         launchThread.start();
-        id++;
       }
     }
 
@@ -621,15 +619,19 @@ public class ApplicationMaster  implemen
 
     Configuration conf;
 
+    long taskAttemptId;
+
     /**
      * @param lcontainer        Allocated container
      * @param containerListener Callback handler of the container
      */
     public LaunchContainerRunnable(
-        Container lcontainer, NMCallbackHandler containerListener, Configuration conf) {
+        Container lcontainer, NMCallbackHandler containerListener,
+        Configuration conf, long taskAttemptId) {
       this.container = lcontainer;
       this.containerListener = containerListener;
       this.conf = conf;
+      this.taskAttemptId = taskAttemptId;
     }
 
     /**
@@ -725,7 +727,7 @@ public class ApplicationMaster  implemen
       vargs.add(BSPRunner.class.getCanonicalName());
 
       vargs.add(jobId.getJtIdentifier());
-      vargs.add(Integer.toString(id));
+      vargs.add(Long.toString(taskAttemptId));
       vargs.add(
           new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory())
               .toString());
@@ -805,7 +807,6 @@ public class ApplicationMaster  implemen
           "ApplicationAttemptId not set in the environment");
     }
 
-    LOG.info("app attempt id!!!");
     ContainerId containerId = ConverterUtils.toContainerId(envs
         .get(ApplicationConstants.Environment.CONTAINER_ID.name()));
     return containerId.getApplicationAttemptId();
@@ -930,9 +931,9 @@ public class ApplicationMaster  implemen
   }
 
   @VisibleForTesting
-  Thread createLaunchContainerThread(Container allocatedContainer) {
+  Thread createLaunchContainerThread(Container allocatedContainer, long taskAttemptId) {
     LaunchContainerRunnable runnableLaunchContainer =
-        new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf);
+        new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf, taskAttemptId);
     return new Thread(runnableLaunchContainer);
   }
 
@@ -1001,6 +1002,7 @@ public class ApplicationMaster  implemen
   public void close() throws IOException {
     this.clientServer.stop();
     this.taskServer.stop();
+    this.syncServer.stopServer();
   }
 
   @Override



Mime
View raw message