apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject [16/29] incubator-apex-core git commit: Using YarnConfiguration getSocketAddr to get the individual rm addresses in HA mode as it will lookup all the necessary configuration to return the address namely yarn.resourcemanager.address followed by yarn.resou
Date Sun, 11 Oct 2015 16:05:31 GMT
Using YarnConfiguration getSocketAddr to get the individual rm addresses in HA mode as it will
lookup all the necessary configuration to return the address namely yarn.resourcemanager.address
followed by yarn.resourcemanager.hostname


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/45e891c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/45e891c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/45e891c7

Branch: refs/heads/feature-module
Commit: 45e891c7cd7f83a34a4eefc0de10306ba139d2da
Parents: bf72215
Author: Pramod Immaneni <pramod@datatorrent.com>
Authored: Fri Sep 25 01:39:48 2015 -0700
Committer: Pramod Immaneni <pramod@datatorrent.com>
Committed: Sun Oct 4 09:54:50 2015 -0700

----------------------------------------------------------------------
 .../stram/client/StramClientUtils.java          | 28 +++++++++++++-------
 .../com/datatorrent/stram/util/ConfigUtils.java |  1 +
 2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index d596a73..45d2feb 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -22,10 +22,6 @@ import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-
 import org.mozilla.javascript.Scriptable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -58,6 +53,10 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.DTLoggerFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.StreamingApplication;
 
 import com.datatorrent.stram.StramClient;
@@ -246,10 +245,7 @@ public class StramClientUtils
       for (String rmId : ConfigUtils.getRMHAIds(conf)) {
         LOG.info("Yarn Resource Manager id: {}", rmId);
         // Set RM_ID to get the corresponding RM_ADDRESS
-        services.add(SecurityUtil.buildTokenService(NetUtils.createSocketAddr(
-                conf.get(RM_HOSTNAME_PREFIX + rmId),
-                YarnConfiguration.DEFAULT_RM_PORT,
-                RM_HOSTNAME_PREFIX + rmId)).toString());
+        services.add(SecurityUtil.buildTokenService(getRMHAAddress(rmId)).toString());
       }
       Text rmTokenService = new Text(Joiner.on(',').join(services));
 
@@ -284,6 +280,20 @@ public class StramClientUtils
       credentials.addToken(token.getService(), token);
     }
 
+    public InetSocketAddress getRMHAAddress(String rmId)
+    {
+      YarnConfiguration yarnConf;
+      if (conf instanceof YarnConfiguration) {
+        yarnConf = (YarnConfiguration)conf;
+      } else {
+        yarnConf = new YarnConfiguration(conf);
+      }
+      yarnConf.set(ConfigUtils.RM_HA_ID, rmId);
+      InetSocketAddress socketAddr = yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
+      yarnConf.unset(ConfigUtils.RM_HA_ID);
+      return socketAddr;
+    }
+
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(StramClientUtils.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
index 481815f..68fe27b 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
@@ -39,6 +39,7 @@ public class ConfigUtils
   private static final String RM_HA_PREFIX = YarnConfiguration.RM_PREFIX + "ha.";
   public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
   public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
+  public static final String RM_HA_ID = RM_HA_PREFIX + "id";
   public static final boolean DEFAULT_RM_HA_ENABLED = false;
 
   private static String yarnLogDir;


Mime
View raw message