atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suma...@apache.org
Subject [2/2] incubator-atlas git commit: ATLAS-571 Modify Atlas client for necessary changes in context of HA (yhemanth via sumasai)
Date Tue, 05 Apr 2016 00:16:14 GMT
ATLAS-571 Modify Atlas client for necessary changes in context of HA (yhemanth via sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/98f4d40a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/98f4d40a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/98f4d40a

Branch: refs/heads/master
Commit: 98f4d40a17668c8ee65b58fcbcb6d09ea6a682e5
Parents: c1d4e7c
Author: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Authored: Mon Apr 4 17:15:46 2016 -0700
Committer: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Committed: Mon Apr 4 17:15:46 2016 -0700

----------------------------------------------------------------------
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  |  10 +-
 .../main/java/org/apache/atlas/AtlasClient.java | 445 ++++++++++++++-----
 .../org/apache/atlas/AtlasServerEnsemble.java   |  52 +++
 .../java/org/apache/atlas/ResourceCreator.java  |  28 ++
 .../atlas/security/SecurityProperties.java      |  42 --
 .../java/org/apache/atlas/AtlasClientTest.java  | 323 ++++++++++++--
 common/pom.xml                                  |   5 +
 .../org/apache/atlas/ha/HAConfiguration.java    | 173 +++++++
 .../atlas/security/SecurityProperties.java      |  46 ++
 .../apache/atlas/ha/HAConfigurationTest.java    |  67 +++
 release-log.txt                                 |   1 +
 .../apache/atlas/ha/AtlasServerIdSelector.java  |  82 ++++
 .../org/apache/atlas/ha/HAConfiguration.java    | 196 --------
 .../atlas/ha/AtlasServerIdSelectorTest.java     |  68 +++
 .../apache/atlas/ha/HAConfigurationTest.java    |  90 ----
 .../org/apache/atlas/examples/QuickStart.java   |   3 +-
 .../service/ActiveInstanceElectorService.java   |   3 +-
 .../atlas/web/service/CuratorFactory.java       |   2 +-
 .../security/NegativeSSLAndKerberosTest.java    |   2 +-
 .../atlas/web/security/SSLAndKerberosTest.java  |   2 +-
 .../org/apache/atlas/web/security/SSLTest.java  |   2 +-
 21 files changed, 1179 insertions(+), 463 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 50a5311..3a802d7 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -96,7 +96,15 @@ public class HiveMetaStoreBridge {
                                UserGroupInformation ugi) throws Exception {
         this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME),
                 Hive.get(hiveConf),
-                new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser));
+                atlasConf, doAsUser, ugi);
+    }
+
+    HiveMetaStoreBridge(String clusterName, Hive hiveClient,
+                        Configuration atlasConf, String doAsUser, UserGroupInformation ugi) {
+        this.clusterName = clusterName;
+        this.hiveClient = hiveClient;
+        String baseUrls = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
+        this.atlasClient = new AtlasClient(ugi, doAsUser, baseUrls.split(","));
     }
 
     HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index 0bb5264..21f21eb 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientHandlerException;
 import com.sun.jersey.api.client.ClientResponse;
@@ -42,6 +43,8 @@ import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -91,23 +94,66 @@ public class AtlasClient {
     public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
     public static final String UNKNOWN_STATUS = "Unknown status";
 
-    private WebResource service;
+    public static final String ATLAS_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+    // Setting the default value based on testing failovers while client code like quickstart is running.
+    public static final int DEFAULT_NUM_RETRIES = 4;
+    public static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+    // Setting the default value based on testing failovers while client code like quickstart is running.
+    // With number of retries, this gives a total time of about 20s for the server to start.
+    public static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
 
-    protected AtlasClient() {
-        //do nothing. For LocalAtlasClient
-    }
+    private WebResource service;
+    private AtlasClientContext atlasClientContext;
+    private Configuration configuration;
 
+    /**
+     * Create a new AtlasClient.
+     *
+     * @param baseUrl The URL of the Atlas server to connect to.
+     */
     public AtlasClient(String baseUrl) {
         this(baseUrl, null, null);
     }
 
+    /**
+     * Create a new Atlas Client.
+     * @param baseUrl The URL of the Atlas server to connect to.
+     * @param ugi The {@link UserGroupInformation} of logged in user.
+     * @param doAsUser The user on whose behalf queries will be executed.
+     */
     public AtlasClient(String baseUrl, UserGroupInformation ugi, String doAsUser) {
+        initializeState(new String[] {baseUrl}, ugi, doAsUser);
+    }
+
+    /**
+     * Create a new Atlas client.
+     * @param ugi The {@link UserGroupInformation} of logged in user, can be null in unsecure mode.
+     * @param doAsUser The user on whose behalf queries will be executed, can be null in unsecure mode.
+     * @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in
+     *                 High Availability mode. The client will automatically determine the
+     *                 active instance on startup and also when there is a scenario of
+     *                 failover.
+     */
+    public AtlasClient(UserGroupInformation ugi, String doAsUser, String... baseUrls) {
+        initializeState(baseUrls, ugi, doAsUser);
+    }
+
+    private void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
+        configuration = getClientProperties();
+        Client client = getClient(configuration, ugi, doAsUser);
+        String activeServiceUrl = determineActiveServiceURL(baseUrls, client);
+        atlasClientContext = new AtlasClientContext(baseUrls, client, ugi, doAsUser);
+        service = client.resource(UriBuilder.fromUri(activeServiceUrl).build());
+    }
+
+    @VisibleForTesting
+    protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) {
         DefaultClientConfig config = new DefaultClientConfig();
         Configuration clientConfig = null;
         int readTimeout = 60000;
         int connectTimeout = 60000;
         try {
-            clientConfig = getClientProperties();
+            clientConfig = configuration;
             if (clientConfig.getBoolean(TLS_ENABLED, false)) {
                 // create an SSL properties configuration if one doesn't exist.  SSLFactory expects a file, so forced
                 // to create a
@@ -124,26 +170,109 @@ public class AtlasClient {
             SecureClientUtils.getClientConnectionHandler(config, clientConfig, doAsUser, ugi);
 
         Client client = new Client(handler, config);
-        client.resource(UriBuilder.fromUri(baseUrl).build());
         client.setReadTimeout(readTimeout);
         client.setConnectTimeout(connectTimeout);
+        return client;
+    }
 
-        service = client.resource(UriBuilder.fromUri(baseUrl).build());
+    @VisibleForTesting
+    protected String determineActiveServiceURL(String[] baseUrls, Client client) {
+        if (baseUrls.length == 0) {
+            throw new IllegalArgumentException("Base URLs cannot be null or empty");
+        }
+        String baseUrl;
+        AtlasServerEnsemble atlasServerEnsemble = new AtlasServerEnsemble(baseUrls);
+        if (atlasServerEnsemble.hasSingleInstance()) {
+            baseUrl = atlasServerEnsemble.firstURL();
+            LOG.info("Client has only one service URL, will use that for all actions: {}", baseUrl);
+            return baseUrl;
+        } else {
+            try {
+                baseUrl = selectActiveServerAddress(client, atlasServerEnsemble);
+            } catch (AtlasServiceException e) {
+                LOG.error("None of the passed URLs are active: {}", atlasServerEnsemble, e);
+                throw new IllegalArgumentException("None of the passed URLs are active " + atlasServerEnsemble, e);
+            }
+        }
+        return baseUrl;
+    }
+
+    private String selectActiveServerAddress(Client client, AtlasServerEnsemble serverEnsemble)
+            throws AtlasServiceException {
+        List<String> serverInstances = serverEnsemble.getMembers();
+        String activeServerAddress = null;
+        for (String serverInstance : serverInstances) {
+            LOG.info("Trying with address {}", serverInstance);
+            activeServerAddress = getAddressIfActive(client, serverInstance);
+            if (activeServerAddress != null) {
+                LOG.info("Found service {} as active service.", serverInstance);
+                break;
+            }
+        }
+        if (activeServerAddress != null)
+            return activeServerAddress;
+        else
+            throw new AtlasServiceException(API.STATUS, new RuntimeException("Could not find any active instance"));
     }
 
-    // for testing
-    AtlasClient(WebResource service) {
+    private String getAddressIfActive(Client client, String serverInstance) {
+        String activeServerAddress = null;
+        for (int i = 0; i < getNumberOfRetries(); i++) {
+            try {
+                WebResource service = client.resource(UriBuilder.fromUri(serverInstance).build());
+                String adminStatus = getAdminStatus(service);
+                if (adminStatus.equals("ACTIVE")) {
+                    activeServerAddress = serverInstance;
+                    break;
+                } else {
+                    LOG.info("Service {} is not active.. will retry.", serverInstance);
+                }
+            } catch (Exception e) {
+                LOG.error("Could not get status from service {} after {} tries.", serverInstance, i, e);
+            }
+            sleepBetweenRetries();
+            LOG.warn("Service {} is not active.", serverInstance);
+        }
+        return activeServerAddress;
+    }
+
+    private void sleepBetweenRetries(){
+        try {
+            Thread.sleep(getSleepBetweenRetriesMs());
+        } catch (InterruptedException e) {
+            LOG.error("Interrupted from sleeping between retries.", e);
+        }
+    }
+
+    private int getSleepBetweenRetriesMs() {
+        return configuration.getInt(ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, DEFAULT_SLEEP_BETWEEN_RETRIES_MS);
+    }
+
+    private int getNumberOfRetries() {
+        return configuration.getInt(ATLAS_CLIENT_HA_RETRIES_KEY, DEFAULT_NUM_RETRIES);
+    }
+
+    @VisibleForTesting
+    AtlasClient(WebResource service, Configuration configuration) {
         this.service = service;
+        this.configuration = configuration;
     }
 
-    protected Configuration getClientProperties() throws AtlasException {
-        return ApplicationProperties.get();
+    protected Configuration getClientProperties() {
+        try {
+            if (configuration == null) {
+                configuration = ApplicationProperties.get();
+            }
+        } catch (AtlasException e) {
+            LOG.error("Exception while loading configuration.", e);
+        }
+        return configuration;
     }
 
     public boolean isServerReady() throws AtlasServiceException {
         WebResource resource = getResource(API.VERSION);
         try {
-            callAPIWithResource(API.VERSION, resource);
+            callAPIWithResource(API.VERSION, resource, null);
             return true;
         } catch (ClientHandlerException che) {
             return false;
@@ -164,9 +293,31 @@ public class AtlasClient {
      * @throws AtlasServiceException if there is a HTTP error.
      */
     public String getAdminStatus() throws AtlasServiceException {
+        return getAdminStatus(service);
+    }
+
+    private void handleClientHandlerException(ClientHandlerException che) {
+        if (isRetryableException(che)) {
+            atlasClientContext.getClient().destroy();
+            LOG.warn("Destroyed current context while handling ClientHandlerEception.");
+            LOG.warn("Will retry and create new context.");
+            sleepBetweenRetries();
+            initializeState(atlasClientContext.getBaseUrls(), atlasClientContext.getUgi(),
+                    atlasClientContext.getDoAsUser());
+            return;
+        }
+        throw che;
+    }
+
+    private boolean isRetryableException(ClientHandlerException che) {
+        return che.getCause().getClass().equals(IOException.class)
+                || che.getCause().getClass().equals(ConnectException.class);
+    }
+
+    private String getAdminStatus(WebResource service) throws AtlasServiceException {
         String result = UNKNOWN_STATUS;
-        WebResource resource = getResource(API.STATUS);
-        JSONObject response = callAPIWithResource(API.STATUS, resource);
+        WebResource resource = getResource(service, API.STATUS);
+        JSONObject response = callAPIWithResource(API.STATUS, resource, null);
         try {
             result = response.getString("Status");
         } catch (JSONException e) {
@@ -282,9 +433,8 @@ public class AtlasClient {
     }
 
     public String getType(String typeName) throws AtlasServiceException {
-        WebResource resource = getResource(API.GET_TYPE, typeName);
         try {
-            JSONObject response = callAPIWithResource(API.GET_TYPE, resource);
+            JSONObject response = callAPI(API.GET_TYPE, null, typeName);;
             return response.getString(DEFINITION);
         } catch (AtlasServiceException e) {
             if (Response.Status.NOT_FOUND.equals(e.getStatus())) {
@@ -366,11 +516,37 @@ public class AtlasClient {
      * @param attribute  property key
      * @param value     property value
      */
-    public void updateEntityAttribute(String guid, String attribute, String value) throws AtlasServiceException {
-        API api = API.UPDATE_ENTITY_PARTIAL;
-        WebResource resource = getResource(api, guid);
-        resource = resource.queryParam(ATTRIBUTE_NAME, attribute);
-        callAPIWithResource(api, resource, value);
+    public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
+        callAPIWithRetries(API.UPDATE_ENTITY_PARTIAL, value, new ResourceCreator() {
+            @Override
+            public WebResource createResource() {
+                API api = API.UPDATE_ENTITY_PARTIAL;
+                WebResource resource = getResource(api, guid);
+                resource = resource.queryParam(ATTRIBUTE_NAME, attribute);
+                return resource;
+            }
+        });
+    }
+
+    @VisibleForTesting
+    JSONObject callAPIWithRetries(API api, Object requestObject, ResourceCreator resourceCreator)
+            throws AtlasServiceException {
+        for (int i = 0; i < getNumberOfRetries(); i++) {
+            WebResource resource = resourceCreator.createResource();
+            try {
+                LOG.info("using resource {} for {} times", resource.getURI(), i);
+                JSONObject result = callAPIWithResource(api, resource, requestObject);
+                return result;
+            } catch (ClientHandlerException che) {
+                if (i==(getNumberOfRetries()-1)) {
+                    throw che;
+                }
+                LOG.warn("Handled exception in calling api {}", api.getPath(), che);
+                LOG.warn("Exception's cause: {}", che.getCause().getClass());
+                handleClientHandlerException(che);
+            }
+        }
+        throw new AtlasServiceException(api, new RuntimeException("Could not get response after retries."));
     }
 
     /**
@@ -392,15 +568,20 @@ public class AtlasClient {
      * @param uniqueAttributeValue Attribute Value that uniquely identifies the entity
      * @param entity entity definition
      */
-    public String updateEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue,
+    public String updateEntity(final String entityType, final String uniqueAttributeName, final String uniqueAttributeValue,
                                Referenceable entity) throws AtlasServiceException {
-        API api = API.UPDATE_ENTITY_PARTIAL;
-        WebResource resource = getResource(api, "qualifiedName");
-        resource = resource.queryParam(TYPE, entityType);
-        resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName);
-        resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue);
+        final API api = API.UPDATE_ENTITY_PARTIAL;
         String entityJson = InstanceSerialization.toJson(entity, true);
-        JSONObject response = callAPIWithResource(api, resource, entityJson);
+        JSONObject response = callAPIWithRetries(api, entityJson, new ResourceCreator() {
+            @Override
+            public WebResource createResource() {
+                WebResource resource = getResource(api, "qualifiedName");
+                resource = resource.queryParam(TYPE, entityType);
+                resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName);
+                resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue);
+                return resource;
+            }
+        });
         try {
             return response.getString(GUID);
         } catch (JSONException e) {
@@ -415,13 +596,18 @@ public class AtlasClient {
      * @return List of deleted entity guids
      * @throws AtlasServiceException
      */
-    public List<String> deleteEntities(String ... guids) throws AtlasServiceException {
-        API api = API.DELETE_ENTITIES;
-        WebResource resource = getResource(api);
-        for (String guid : guids) {
-            resource = resource.queryParam(GUID.toLowerCase(), guid);
-        }
-        JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource);
+    public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
+        JSONObject jsonResponse = callAPIWithRetries(API.DELETE_ENTITIES, null, new ResourceCreator() {
+            @Override
+            public WebResource createResource() {
+                API api = API.DELETE_ENTITIES;
+                WebResource resource = getResource(api);
+                for (String guid : guids) {
+                    resource = resource.queryParam(GUID.toLowerCase(), guid);
+                }
+                return resource;
+            }
+        });
         return extractResults(jsonResponse, GUID);
     }
     
@@ -457,12 +643,18 @@ public class AtlasClient {
      * @return result object
      * @throws AtlasServiceException
      */
-    public Referenceable getEntity(String entityType, String attribute, String value) throws AtlasServiceException {
-        WebResource resource = getResource(API.GET_ENTITY);
-        resource = resource.queryParam(TYPE, entityType);
-        resource = resource.queryParam(ATTRIBUTE_NAME, attribute);
-        resource = resource.queryParam(ATTRIBUTE_VALUE, value);
-        JSONObject jsonResponse = callAPIWithResource(API.GET_ENTITY, resource);
+    public Referenceable getEntity(final String entityType, final String attribute, final String value)
+            throws AtlasServiceException {
+        JSONObject jsonResponse = callAPIWithRetries(API.GET_ENTITY, null, new ResourceCreator() {
+            @Override
+            public WebResource createResource() {
+                WebResource resource = getResource(API.GET_ENTITY);
+                resource = resource.queryParam(TYPE, entityType);
+                resource = resource.queryParam(ATTRIBUTE_NAME, attribute);
+                resource = resource.queryParam(ATTRIBUTE_VALUE, value);
+                return resource;
+            }
+        });
         try {
             String entityInstanceDefinition = jsonResponse.getString(AtlasClient.DEFINITION);
             return InstanceSerialization.fromJsonReferenceable(entityInstanceDefinition, true);
@@ -477,10 +669,15 @@ public class AtlasClient {
      * @return
      * @throws AtlasServiceException
      */
-    public List<String> listEntities(String entityType) throws AtlasServiceException {
-        WebResource resource = getResource(API.LIST_ENTITIES);
-        resource = resource.queryParam(TYPE, entityType);
-        JSONObject jsonResponse = callAPIWithResource(API.LIST_ENTITIES, resource);
+    public List<String> listEntities(final String entityType) throws AtlasServiceException {
+        JSONObject jsonResponse = callAPIWithRetries(API.LIST_ENTITIES, null, new ResourceCreator() {
+            @Override
+            public WebResource createResource() {
+                WebResource resource = getResource(API.LIST_ENTITIES);
+                resource = resource.queryParam(TYPE, entityType);
+                return resource;
+            }
+        });
         return extractResults(jsonResponse, AtlasClient.RESULTS);
     }
 
@@ -508,10 +705,15 @@ public class AtlasClient {
      * @return
      * @throws AtlasServiceException
      */
-    public JSONArray search(String searchQuery) throws AtlasServiceException {
-        WebResource resource = getResource(API.SEARCH);
-        resource = resource.queryParam(QUERY, searchQuery);
-        JSONObject result = callAPIWithResource(API.SEARCH, resource);
+    public JSONArray search(final String searchQuery) throws AtlasServiceException {
+        JSONObject result = callAPIWithRetries(API.SEARCH, null, new ResourceCreator() {
+            @Override
+            public WebResource createResource() {
+                WebResource resource = getResource(API.SEARCH);
+                resource = resource.queryParam(QUERY, searchQuery);
+                return resource;
+            }
+        });
         try {
             return result.getJSONArray(RESULTS);
         } catch (JSONException e) {
@@ -521,34 +723,21 @@ public class AtlasClient {
     }
 
     /**
-     * Search given type name, an attribute and its value. Uses search dsl
-     * @param typeName name of the entity type
-     * @param attributeName attribute name
-     * @param attributeValue attribute value
-     * @return result json object
-     * @throws AtlasServiceException
-     */
-    public JSONArray rawSearch(String typeName, String attributeName, Object attributeValue)
-            throws AtlasServiceException {
-        //        String gremlinQuery = String.format(
-        //                "g.V.has(\"typeName\",\"%s\").and(_().has(\"%s.%s\", T.eq, \"%s\")).toList()",
-        //                typeName, typeName, attributeName, attributeValue);
-        //        return searchByGremlin(gremlinQuery);
-        String dslQuery = String.format("%s where %s = \"%s\"", typeName, attributeName, attributeValue);
-        return searchByDSL(dslQuery);
-    }
-
-    /**
      * Search given query DSL
      * @param query DSL query
      * @return result json object
      * @throws AtlasServiceException
      */
-    public JSONArray searchByDSL(String query) throws AtlasServiceException {
+    public JSONArray searchByDSL(final String query) throws AtlasServiceException {
         LOG.debug("DSL query: {}", query);
-        WebResource resource = getResource(API.SEARCH_DSL);
-        resource = resource.queryParam(QUERY, query);
-        JSONObject result = callAPIWithResource(API.SEARCH_DSL, resource);
+        JSONObject result = callAPIWithRetries(API.SEARCH_DSL, null, new ResourceCreator() {
+            @Override
+            public WebResource createResource() {
+                WebResource resource = getResource(API.SEARCH_DSL);
+                resource = resource.queryParam(QUERY, query);
+                return resource;
+            }
+        });
         try {
             return result.getJSONArray(RESULTS);
         } catch (JSONException e) {
@@ -562,11 +751,16 @@ public class AtlasClient {
      * @return result json object
      * @throws AtlasServiceException
      */
-    public JSONArray searchByGremlin(String gremlinQuery) throws AtlasServiceException {
+    public JSONArray searchByGremlin(final String gremlinQuery) throws AtlasServiceException {
         LOG.debug("Gremlin query: " + gremlinQuery);
-        WebResource resource = getResource(API.SEARCH_GREMLIN);
-        resource = resource.queryParam(QUERY, gremlinQuery);
-        JSONObject result = callAPIWithResource(API.SEARCH_GREMLIN, resource);
+        JSONObject result = callAPIWithRetries(API.SEARCH_GREMLIN, null, new ResourceCreator() {
+            @Override
+            public WebResource createResource() {
+                WebResource resource = getResource(API.SEARCH_GREMLIN);
+                resource = resource.queryParam(QUERY, gremlinQuery);
+                return resource;
+            }
+        });
         try {
             return result.getJSONArray(RESULTS);
         } catch (JSONException e) {
@@ -580,10 +774,15 @@ public class AtlasClient {
      * @return result json object
      * @throws AtlasServiceException
      */
-    public JSONObject searchByFullText(String query) throws AtlasServiceException {
-        WebResource resource = getResource(API.SEARCH_FULL_TEXT);
-        resource = resource.queryParam(QUERY, query);
-        return callAPIWithResource(API.SEARCH_FULL_TEXT, resource);
+    public JSONObject searchByFullText(final String query) throws AtlasServiceException {
+        return callAPIWithRetries(API.SEARCH_FULL_TEXT, null, new ResourceCreator() {
+            @Override
+            public WebResource createResource() {
+                WebResource resource = getResource(API.SEARCH_FULL_TEXT);
+                resource = resource.queryParam(QUERY, query);
+                return resource;
+            }
+        });
     }
 
     public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
@@ -604,15 +803,11 @@ public class AtlasClient {
         }
     }
 
-    public String getRequestId(JSONObject json) throws AtlasServiceException {
-        try {
-            return json.getString(REQUEST_ID);
-        } catch (JSONException e) {
-            throw new AtlasServiceException(e);
-        }
+    private WebResource getResource(API api, String... pathParams) {
+        return getResource(service, api, pathParams);
     }
 
-    private WebResource getResource(API api, String... pathParams) {
+    private WebResource getResource(WebResource service, API api, String... pathParams) {
         WebResource resource = service.path(api.getPath());
         if (pathParams != null) {
             for (String pathParam : pathParams) {
@@ -622,29 +817,75 @@ public class AtlasClient {
         return resource;
     }
 
-    private JSONObject callAPIWithResource(API api, WebResource resource) throws AtlasServiceException {
-        return callAPIWithResource(api, resource, null);
-    }
-
     private JSONObject callAPIWithResource(API api, WebResource resource, Object requestObject)
         throws AtlasServiceException {
-        ClientResponse clientResponse = resource.accept(JSON_MEDIA_TYPE).type(JSON_MEDIA_TYPE)
-            .method(api.getMethod(), ClientResponse.class, requestObject);
-
-        if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) {
-            String responseAsString = clientResponse.getEntity(String.class);
-            try {
-                return new JSONObject(responseAsString);
-            } catch (JSONException e) {
-                throw new AtlasServiceException(api, e);
+        ClientResponse clientResponse = null;
+        for (int i = 0; i < getNumberOfRetries(); i++) {
+            clientResponse = resource.accept(JSON_MEDIA_TYPE).type(JSON_MEDIA_TYPE)
+                .method(api.getMethod(), ClientResponse.class, requestObject);
+
+            if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) {
+                String responseAsString = clientResponse.getEntity(String.class);
+                try {
+                    return new JSONObject(responseAsString);
+                } catch (JSONException e) {
+                    throw new AtlasServiceException(api, e);
+                }
+            } else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
+                break;
+            } else {
+                LOG.error("Got a service unavailable when calling: {}, will retry..", resource);
+                sleepBetweenRetries();
             }
         }
 
         throw new AtlasServiceException(api, clientResponse);
     }
 
-    private JSONObject callAPI(API api, Object requestObject, String... pathParams) throws AtlasServiceException {
-        WebResource resource = getResource(api, pathParams);
-        return callAPIWithResource(api, resource, requestObject);
+    private JSONObject callAPI(final API api, Object requestObject, final String... pathParams)
+            throws AtlasServiceException {
+        return callAPIWithRetries(api, requestObject, new ResourceCreator() {
+            @Override
+            public WebResource createResource() {
+                return getResource(api, pathParams);
+            }
+        });
     }
+
+    /**
+     * A class to capture input state while creating the client.
+     *
+     * The information here will be reused when the client is re-initialized on switch-over
+     * in case of High Availability.
+     */
+    private class AtlasClientContext {
+        private String[] baseUrls;
+        private Client client;
+        private final UserGroupInformation ugi;
+        private final String doAsUser;
+
+        public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) {
+            this.baseUrls = baseUrls;
+            this.client = client;
+            this.ugi = ugi;
+            this.doAsUser = doAsUser;
+        }
+
+        public UserGroupInformation getUgi() {
+            return ugi;
+        }
+
+        public String getDoAsUser() {
+            return doAsUser;
+        }
+
+        public Client getClient() {
+            return client;
+        }
+
+        public String[] getBaseUrls() {
+            return baseUrls;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/client/src/main/java/org/apache/atlas/AtlasServerEnsemble.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasServerEnsemble.java b/client/src/main/java/org/apache/atlas/AtlasServerEnsemble.java
new file mode 100644
index 0000000..96df6a3
--- /dev/null
+++ b/client/src/main/java/org/apache/atlas/AtlasServerEnsemble.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import scala.actors.threadpool.Arrays;
+
+import java.util.List;
+
+public class AtlasServerEnsemble {
+
+    private final String[] urls;
+
+    public AtlasServerEnsemble(String[] baseUrls) {
+        Preconditions.checkArgument((baseUrls!=null && baseUrls.length>0),
+                "List of baseURLs cannot be null or empty.");
+        for (String baseUrl : baseUrls) {
+            Preconditions.checkArgument(StringUtils.isNotEmpty(baseUrl),
+                    "Base URL cannot be null or empty.");
+        }
+        urls = baseUrls;
+    }
+
+    public boolean hasSingleInstance() {
+        return urls.length==1;
+    }
+
+    public String firstURL() {
+        return urls[0];
+    }
+
+    public List<String> getMembers() {
+        return Arrays.asList(urls);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/client/src/main/java/org/apache/atlas/ResourceCreator.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/ResourceCreator.java b/client/src/main/java/org/apache/atlas/ResourceCreator.java
new file mode 100644
index 0000000..53f92aa
--- /dev/null
+++ b/client/src/main/java/org/apache/atlas/ResourceCreator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.sun.jersey.api.client.WebResource;
+
+/**
+ * An interface to capture the closure of how a WebResource is created.
+ */
+public interface ResourceCreator {
+    WebResource createResource();
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/client/src/main/java/org/apache/atlas/security/SecurityProperties.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/security/SecurityProperties.java b/client/src/main/java/org/apache/atlas/security/SecurityProperties.java
deleted file mode 100644
index b6c8c9b..0000000
--- a/client/src/main/java/org/apache/atlas/security/SecurityProperties.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.security;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- *
- */
-public interface SecurityProperties {
-    String TLS_ENABLED = "atlas.enableTLS";
-    String KEYSTORE_FILE_KEY = "keystore.file";
-    String DEFAULT_KEYSTORE_FILE_LOCATION = "target/atlas.keystore";
-    String KEYSTORE_PASSWORD_KEY = "keystore.password";
-    String TRUSTSTORE_FILE_KEY = "truststore.file";
-    String DEFATULT_TRUSTORE_FILE_LOCATION = "target/atlas.keystore";
-    String TRUSTSTORE_PASSWORD_KEY = "truststore.password";
-    String SERVER_CERT_PASSWORD_KEY = "password";
-    String CLIENT_AUTH_KEY = "client.auth.enabled";
-    String CERT_STORES_CREDENTIAL_PROVIDER_PATH = "cert.stores.credential.provider.path";
-    String SSL_CLIENT_PROPERTIES = "ssl-client.xml";
-    String BIND_ADDRESS = "atlas.server.bind.address";
-    String ATLAS_SSL_EXCLUDE_CIPHER_SUITES = "atlas.ssl.exclude.cipher.suites";
-    List<String> DEFAULT_CIPHER_SUITES = Arrays.asList(".*NULL.*", ".*RC4.*", ".*MD5.*",".*DES.*",".*DSS.*");
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/client/src/test/java/org/apache/atlas/AtlasClientTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
index 943301b..8911bf5 100644
--- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java
+++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
@@ -17,28 +17,58 @@
 
 package org.apache.atlas;
 
+import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientHandlerException;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+
+import java.net.ConnectException;
+import java.net.URI;
+import java.net.URISyntaxException;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.fail;
 
 public class AtlasClientTest {
 
+    @Mock
+    private WebResource service;
+
+    @Mock
+    private Configuration configuration;
+
+    @Mock
+    private Client client;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
     @Test
     public void shouldVerifyServerIsReady() throws AtlasServiceException {
-        WebResource webResource = mock(WebResource.class);
-        AtlasClient atlasClient = new AtlasClient(webResource);
+        setupRetryParams();
 
-        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource);
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
+
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
         ClientResponse response = mock(ClientResponse.class);
         when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
         when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," +
@@ -49,19 +79,16 @@ public class AtlasClientTest {
     }
 
     private WebResource.Builder setupBuilder(AtlasClient.API api, WebResource webResource) {
-        WebResource adminVersionResource = mock(WebResource.class);
-        when(webResource.path(api.getPath())).thenReturn(adminVersionResource);
-        WebResource.Builder builder = mock(WebResource.Builder.class);
-        when(adminVersionResource.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
-        when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
+        when(webResource.path(api.getPath())).thenReturn(service);
+        WebResource.Builder builder = getBuilder(service);
         return builder;
     }
 
     @Test
     public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException {
-        WebResource webResource = mock(WebResource.class);
-        AtlasClient atlasClient = new AtlasClient(webResource);
-        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource);
+        setupRetryParams();
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
         when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow(
                 new ClientHandlerException());
         assertFalse(atlasClient.isServerReady());
@@ -69,9 +96,9 @@ public class AtlasClientTest {
 
     @Test
     public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException {
-        WebResource webResource = mock(WebResource.class);
-        AtlasClient atlasClient = new AtlasClient(webResource);
-        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource);
+        setupRetryParams();
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
         ClientResponse response = mock(ClientResponse.class);
         when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
         when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);
@@ -83,9 +110,10 @@ public class AtlasClientTest {
 
     @Test(expectedExceptions = AtlasServiceException.class)
     public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException {
-        WebResource webResource = mock(WebResource.class);
-        AtlasClient atlasClient = new AtlasClient(webResource);
-        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource);
+        setupRetryParams();
+
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
         ClientResponse response = mock(ClientResponse.class);
         when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
         when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
@@ -98,10 +126,11 @@ public class AtlasClientTest {
     
     @Test
     public void shouldGetAdminStatus() throws AtlasServiceException {
-        WebResource webResource = mock(WebResource.class);
-        AtlasClient atlasClient = new AtlasClient(webResource);
+        setupRetryParams();
+
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
 
-        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource);
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
         ClientResponse response = mock(ClientResponse.class);
         when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
         when(response.getEntity(String.class)).thenReturn("{\"Status\":\"Active\"}");
@@ -113,10 +142,11 @@ public class AtlasClientTest {
 
     @Test(expectedExceptions = AtlasServiceException.class)
     public void shouldReturnStatusAsUnknownOnException() throws AtlasServiceException {
-        WebResource webResource = mock(WebResource.class);
-        AtlasClient atlasClient = new AtlasClient(webResource);
+        setupRetryParams();
+
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
 
-        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource);
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
         ClientResponse response = mock(ClientResponse.class);
         when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
         when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
@@ -128,10 +158,10 @@ public class AtlasClientTest {
 
     @Test
     public void shouldReturnStatusAsUnknownIfJSONIsInvalid() throws AtlasServiceException {
-        WebResource webResource = mock(WebResource.class);
-        AtlasClient atlasClient = new AtlasClient(webResource);
+        setupRetryParams();
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
 
-        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource);
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
         ClientResponse response = mock(ClientResponse.class);
         when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
         when(response.getEntity(String.class)).thenReturn("{\"status\":\"Active\"}");
@@ -140,4 +170,245 @@ public class AtlasClientTest {
         String status = atlasClient.getAdminStatus();
         assertEquals(status, AtlasClient.UNKNOWN_STATUS);
     }
+    
+    @Test
+    public void shouldReturnBaseURLAsPassedInURL() {
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
+
+        String serviceURL = atlasClient.determineActiveServiceURL(new String[]{"http://localhost:21000"}, client);
+        assertEquals(serviceURL, "http://localhost:21000");
+    }
+
+    @Test
+    public void shouldSelectActiveAmongMultipleServersIfHAIsEnabled() {
+        setupRetryParams();
+
+        when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
+        when(client.resource(UriBuilder.fromUri("http://localhost:41000").build())).thenReturn(service);
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
+        ClientResponse firstResponse = mock(ClientResponse.class);
+        when(firstResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(firstResponse.getEntity(String.class)).thenReturn("{\"Status\":\"PASSIVE\"}");
+        ClientResponse secondResponse = mock(ClientResponse.class);
+        when(secondResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(secondResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
+        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
+                thenReturn(firstResponse).thenReturn(firstResponse).thenReturn(firstResponse).
+                thenReturn(secondResponse);
+
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
+
+        String serviceURL = atlasClient.determineActiveServiceURL(
+                new String[]{"http://localhost:31000", "http://localhost:41000"},
+                client);
+        assertEquals(serviceURL, "http://localhost:41000");
+    }
+
+    @Test
+    public void shouldRetryUntilServiceBecomesActive() {
+        setupRetryParams();
+
+        when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
+        ClientResponse response = mock(ClientResponse.class);
+        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
+        ClientResponse nextResponse = mock(ClientResponse.class);
+        when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(nextResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
+        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
+                thenReturn(response).thenReturn(response).thenReturn(nextResponse);
+
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
+
+        String serviceURL = atlasClient.determineActiveServiceURL(
+                new String[] {"http://localhost:31000","http://localhost:41000"},
+                client);
+        assertEquals(serviceURL, "http://localhost:31000");
+    }
+
+    @Test
+    public void shouldRetryIfCannotConnectToServiceInitially() {
+        setupRetryParams();
+
+        when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
+        ClientResponse response = mock(ClientResponse.class);
+        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
+        ClientResponse nextResponse = mock(ClientResponse.class);
+        when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(nextResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
+        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
+                thenThrow(new ClientHandlerException("Simulating connection exception")).
+                thenReturn(response).
+                thenReturn(nextResponse);
+
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
+
+        String serviceURL = atlasClient.determineActiveServiceURL(
+                new String[] {"http://localhost:31000","http://localhost:41000"},
+                client);
+        assertEquals(serviceURL, "http://localhost:31000");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldThrowExceptionIfActiveServerIsNotFound() {
+        setupRetryParams();
+
+        when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
+        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
+        ClientResponse response = mock(ClientResponse.class);
+        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
+        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
+                thenThrow(new ClientHandlerException("Simulating connection exception")).
+                thenReturn(response).
+                thenReturn(response);
+
+        AtlasClient atlasClient = new AtlasClient(service, configuration);
+
+        String serviceURL = atlasClient.determineActiveServiceURL(
+                new String[] {"http://localhost:31000","http://localhost:41000"},
+                client);
+        assertNull(serviceURL);
+    }
+
+    @Test
+    public void shouldRetryAPICallsOnClientHandlerException() throws AtlasServiceException, URISyntaxException {
+        setupRetryParams();
+
+        ResourceCreator resourceCreator = mock(ResourceCreator.class);
+        WebResource resourceObject = mock(WebResource.class);
+        when(resourceObject.getURI()).
+                thenReturn(new URI("http://localhost:31000/api/atlas/types")).
+                thenReturn(new URI("http://localhost:41000/api/atlas/types")).
+                thenReturn(new URI("http://localhost:41000/api/atlas/types"));
+
+        WebResource.Builder builder = getBuilder(resourceObject);
+
+        ClientResponse response = mock(ClientResponse.class);
+        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
+
+        when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
+                thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
+                thenReturn(response);
+
+        when(resourceCreator.createResource()).thenReturn(resourceObject);
+
+        AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000");
+
+        atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
+
+        verify(client).destroy();
+        verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build());
+        verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build());
+    }
+
+    @Test
+    public void shouldRetryWithSameClientIfSingleAddressIsUsed() throws URISyntaxException, AtlasServiceException {
+        setupRetryParams();
+
+        ResourceCreator resourceCreator = mock(ResourceCreator.class);
+        WebResource resourceObject = mock(WebResource.class);
+        when(resourceObject.getURI()).
+                thenReturn(new URI("http://localhost:31000/api/atlas/types"));
+
+        WebResource.Builder builder = getBuilder(resourceObject);
+
+        ClientResponse response = mock(ClientResponse.class);
+        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
+
+        when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
+                thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
+                thenReturn(response);
+
+        when(resourceCreator.createResource()).thenReturn(resourceObject);
+
+        AtlasClient atlasClient = getClientForTest("http://localhost:31000");
+
+        atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
+
+        verify(client).destroy();
+        verify(client, times(2)).resource(UriBuilder.fromUri("http://localhost:31000").build());
+    }
+
+    @Test
+    public void shouldRetryAPICallsOnServiceUnavailable() throws AtlasServiceException, URISyntaxException {
+        setupRetryParams();
+
+        ResourceCreator resourceCreator = mock(ResourceCreator.class);
+        WebResource resourceObject = mock(WebResource.class);
+        when(resourceObject.getURI()).
+                thenReturn(new URI("http://localhost:31000/api/atlas/types")).
+                thenReturn(new URI("http://localhost:41000/api/atlas/types")).
+                thenReturn(new URI("http://localhost:41000/api/atlas/types"));
+
+        WebResource.Builder builder = getBuilder(resourceObject);
+
+        ClientResponse firstResponse = mock(ClientResponse.class);
+        when(firstResponse.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
+        when(firstResponse.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);
+
+        ClientResponse response = mock(ClientResponse.class);
+        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
+
+        when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
+                thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
+                thenReturn(firstResponse).
+                thenReturn(response);
+
+        when(resourceCreator.createResource()).thenReturn(resourceObject);
+
+        AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000");
+
+        atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
+
+        verify(client).destroy();
+        verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build());
+        verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build());
+    }
+
+    private WebResource.Builder getBuilder(WebResource resourceObject) {
+        WebResource.Builder builder = mock(WebResource.Builder.class);
+        when(resourceObject.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
+        when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
+        return builder;
+    }
+
+    private void setupRetryParams() {
+        when(configuration.getInt(AtlasClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasClient.DEFAULT_NUM_RETRIES)).
+                thenReturn(3);
+        when(configuration.getInt(AtlasClient.ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY,
+                AtlasClient.DEFAULT_SLEEP_BETWEEN_RETRIES_MS)).
+                thenReturn(1);
+    }
+
+    private AtlasClient getClientForTest(final String... baseUrls) {
+        return new AtlasClient(null, null, baseUrls) {
+            boolean firstCall = true;
+            @Override
+            protected String determineActiveServiceURL(String[] baseUrls, Client client) {
+                String returnUrl = baseUrls[0];
+                if (baseUrls.length > 1 && !firstCall) {
+                    returnUrl = baseUrls[1];
+                }
+                firstCall = false;
+                return returnUrl;
+            }
+
+            @Override
+            protected Configuration getClientProperties() {
+                return configuration;
+            }
+
+            @Override
+            protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) {
+                return client;
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 1f3d96e..614b3f6 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -51,5 +51,10 @@
             <artifactId>commons-configuration</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java b/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java
new file mode 100644
index 0000000..2e86a19
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.ha;
+
+import org.apache.atlas.security.SecurityProperties;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A wrapper for getting configuration entries related to HighAvailability.
+ */
+public final class HAConfiguration {
+
+    private HAConfiguration() {
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class);
+
+    public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha.";
+    public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + "enabled";
+    public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address.";
+    public static final String ATLAS_SERVER_IDS = "atlas.server.ids";
+    public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + "zookeeper.connect";
+    public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000;
+    public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS =
+            ATLAS_SERVER_HA_PREFIX + "zookeeper.retry.sleeptime.ms";
+    public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + "zookeeper.num.retries";
+    public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3;
+    public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS =
+            ATLAS_SERVER_HA_PREFIX + "zookeeper.session.timeout.ms";
+    public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000;
+
+    /**
+     * Return whether HA is enabled or not.
+     * @param configuration underlying configuration instance
+     * @return
+     */
+    public static boolean isHAEnabled(Configuration configuration) {
+        return configuration.getBoolean(ATLAS_SERVER_HA_ENABLED_KEY, false);
+    }
+
+    /**
+     * Get the web server address that a server instance with the passed ID is bound to.
+     *
+     * This method uses the property {@link SecurityProperties#TLS_ENABLED} to determine whether
+     * the URL is http or https.
+     *
+     * @param configuration underlying configuration
+     * @param serverId serverId whose host:port property is picked to build the web server address.
+     * @return
+     */
+    public static String getBoundAddressForId(Configuration configuration, String serverId) {
+        String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +serverId);
+        boolean isSecure = configuration.getBoolean(SecurityProperties.TLS_ENABLED);
+        String protocol = (isSecure) ? "https://" : "http://";
+        return protocol + hostPort;
+    }
+
+    public static List<String> getServerInstances(Configuration configuration) {
+        String[] serverIds = configuration.getStringArray(ATLAS_SERVER_IDS);
+        List<String> serverInstances = new ArrayList<>(serverIds.length);
+        for (String serverId : serverIds) {
+            serverInstances.add(getBoundAddressForId(configuration, serverId));
+        }
+        return serverInstances;
+    }
+
+    /**
+     * A collection of Zookeeper specific configuration that is used by High Availability code.
+     */
+    public static class ZookeeperProperties {
+        private String connectString;
+        private int retriesSleepTimeMillis;
+        private int numRetries;
+        private int sessionTimeout;
+
+        public ZookeeperProperties(String connectString, int retriesSleepTimeMillis, int numRetries,
+                                   int sessionTimeout) {
+            this.connectString = connectString;
+            this.retriesSleepTimeMillis = retriesSleepTimeMillis;
+            this.numRetries = numRetries;
+            this.sessionTimeout = sessionTimeout;
+        }
+
+        public String getConnectString() {
+            return connectString;
+        }
+
+        public int getRetriesSleepTimeMillis() {
+            return retriesSleepTimeMillis;
+        }
+
+        public int getNumRetries() {
+            return numRetries;
+        }
+
+        public int getSessionTimeout() {
+            return sessionTimeout;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            ZookeeperProperties that = (ZookeeperProperties) o;
+
+            if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) {
+                return false;
+            }
+
+            if (numRetries != that.numRetries) {
+                return false;
+            }
+
+            if (sessionTimeout != that.sessionTimeout) {
+                return false;
+            }
+
+            return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null);
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = connectString != null ? connectString.hashCode() : 0;
+            result = 31 * result + retriesSleepTimeMillis;
+            result = 31 * result + numRetries;
+            result = 31 * result + sessionTimeout;
+            return result;
+        }
+    }
+
+    public static ZookeeperProperties getZookeeperProperties(Configuration configuration) {
+        String zookeeperConnectString = configuration.getString("atlas.kafka.zookeeper.connect");
+        if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) {
+            zookeeperConnectString = configuration.getString(HA_ZOOKEEPER_CONNECT);
+        }
+
+        int retriesSleepTimeMillis = configuration.getInt(HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS,
+                DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS);
+
+        int numRetries = configuration.getInt(HA_ZOOKEEPER_NUM_RETRIES, DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES);
+
+        int sessionTimeout = configuration.getInt(HA_ZOOKEEPER_SESSION_TIMEOUT_MS,
+                DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
+        return new ZookeeperProperties(zookeeperConnectString, retriesSleepTimeMillis, numRetries, sessionTimeout);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/common/src/main/java/org/apache/atlas/security/SecurityProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/security/SecurityProperties.java b/common/src/main/java/org/apache/atlas/security/SecurityProperties.java
new file mode 100644
index 0000000..191d869
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/security/SecurityProperties.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.security;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ *
+ */
+public final class SecurityProperties {
+
+    private SecurityProperties() {
+    }
+
+    public static final String TLS_ENABLED = "atlas.enableTLS";
+    public static final String KEYSTORE_FILE_KEY = "keystore.file";
+    public static final String DEFAULT_KEYSTORE_FILE_LOCATION = "target/atlas.keystore";
+    public static final String KEYSTORE_PASSWORD_KEY = "keystore.password";
+    public static final String TRUSTSTORE_FILE_KEY = "truststore.file";
+    public static final String DEFATULT_TRUSTORE_FILE_LOCATION = "target/atlas.keystore";
+    public static final String TRUSTSTORE_PASSWORD_KEY = "truststore.password";
+    public static final String SERVER_CERT_PASSWORD_KEY = "password";
+    public static final String CLIENT_AUTH_KEY = "client.auth.enabled";
+    public static final String CERT_STORES_CREDENTIAL_PROVIDER_PATH = "cert.stores.credential.provider.path";
+    public static final String SSL_CLIENT_PROPERTIES = "ssl-client.xml";
+    public static final String BIND_ADDRESS = "atlas.server.bind.address";
+    public static final String ATLAS_SSL_EXCLUDE_CIPHER_SUITES = "atlas.ssl.exclude.cipher.suites";
+    public static final List<String> DEFAULT_CIPHER_SUITES = Arrays.asList(
+            ".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*");
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/common/src/test/java/org/apache/atlas/ha/HAConfigurationTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/atlas/ha/HAConfigurationTest.java b/common/src/test/java/org/apache/atlas/ha/HAConfigurationTest.java
new file mode 100644
index 0000000..8f0b9c5
--- /dev/null
+++ b/common/src/test/java/org/apache/atlas/ha/HAConfigurationTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.ha;
+
+import org.apache.atlas.AtlasConstants;
+import org.apache.atlas.security.SecurityProperties;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class HAConfigurationTest {
+
+    @Mock
+    private Configuration configuration;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+        System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, AtlasConstants.DEFAULT_APP_PORT_STR);
+    }
+
+    @Test
+    public void testShouldReturnHTTPSBoundAddress() {
+        when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21443");
+        when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(true);
+
+        String address = HAConfiguration.getBoundAddressForId(configuration, "id1");
+
+        assertEquals(address, "https://127.0.0.1:21443");
+    }
+
+    @Test
+    public void testShouldReturnListOfAddressesInConfig() {
+        when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"});
+        when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
+        when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id2")).thenReturn("127.0.0.1:31000");
+
+        List<String> serverInstances = HAConfiguration.getServerInstances(configuration);
+        assertEquals(serverInstances.size(), 2);
+        assertTrue(serverInstances.contains("http://127.0.0.1:21000"));
+        assertTrue(serverInstances.contains("http://127.0.0.1:31000"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 586a49e..10fcdbb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-571 Modify Atlas client for necessary changes in context of HA (yhemanth via sumasai)
 ATLAS-620 Disable hbase based entity audit (shwethags)
 ATLAS-618 Fix assembly for hdfs-module (sumasai via yhemanth)
 ATLAS-573 Inherited attributes disappear from entities after server restart (dkantor via sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/server-api/src/main/java/org/apache/atlas/ha/AtlasServerIdSelector.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/ha/AtlasServerIdSelector.java b/server-api/src/main/java/org/apache/atlas/ha/AtlasServerIdSelector.java
new file mode 100644
index 0000000..f3d36a7
--- /dev/null
+++ b/server-api/src/main/java/org/apache/atlas/ha/AtlasServerIdSelector.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.ha;
+
+import org.apache.atlas.AtlasConstants;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+public class AtlasServerIdSelector {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasServerIdSelector.class);
+
+    /**
+     * Return the ID corresponding to this Atlas instance.
+     *
+     * The match is done by looking for an ID configured in {@link HAConfiguration#ATLAS_SERVER_IDS} key
+     * that has a host:port entry for the key {@link HAConfiguration#ATLAS_SERVER_ADDRESS_PREFIX}+ID where
+     * the host is a local IP address and port is set in the system property
+     * {@link AtlasConstants#SYSTEM_PROPERTY_APP_PORT}.
+     *
+     * @param configuration
+     * @return
+     * @throws AtlasException if no ID is found that maps to a local IP Address or port
+     */
+    public static String selectServerId(Configuration configuration) throws AtlasException {
+        // ids are already trimmed by this method
+        String[] ids = configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS);
+        String matchingServerId = null;
+        int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT));
+        for (String id : ids) {
+            String hostPort = configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +id);
+            if (!StringUtils.isEmpty(hostPort)) {
+                InetSocketAddress socketAddress;
+                try {
+                    socketAddress = NetUtils.createSocketAddr(hostPort);
+                } catch (Exception e) {
+                    LOG.warn("Exception while trying to get socket address for " + hostPort, e);
+                    continue;
+                }
+                if (!socketAddress.isUnresolved()
+                        && NetUtils.isLocalAddress(socketAddress.getAddress())
+                        && appPort == socketAddress.getPort()) {
+                    LOG.info("Found matched server id " + id + " with host port: " + hostPort);
+                    matchingServerId = id;
+                    break;
+                }
+            } else {
+                LOG.info("Could not find matching address entry for id: " + id);
+            }
+        }
+        if (matchingServerId == null) {
+            String msg = String.format("Could not find server id for this instance. " +
+                            "Unable to find IDs matching any local host and port binding among %s",
+                    StringUtils.join(ids, ","));
+            throw new AtlasException(msg);
+        }
+        return matchingServerId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java b/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java
deleted file mode 100644
index 06977c5..0000000
--- a/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.ha;
-
-import org.apache.atlas.AtlasConstants;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.security.SecurityProperties;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-
-/**
- * A wrapper for getting configuration entries related to HighAvailability.
- */
-public class HAConfiguration {
-
-    private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class);
-
-    public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha";
-    public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + ".enabled";
-    public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address.";
-    public static final String ATLAS_SERVER_IDS = "atlas.server.ids";
-    public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + ".zookeeper.connect";
-    public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000;
-    public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.retry.sleeptime.ms";
-    public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + ".zookeeper.num.retries";
-    public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3;
-    public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.session.timeout.ms";
-    public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000;
-
-    /**
-     * Return whether HA is enabled or not.
-     * @param configuration underlying configuration instance
-     * @return
-     */
-    public static boolean isHAEnabled(Configuration configuration) {
-        return configuration.getBoolean(ATLAS_SERVER_HA_ENABLED_KEY, false);
-    }
-
-    /**
-     * Return the ID corresponding to this Atlas instance.
-     *
-     * The match is done by looking for an ID configured in {@link HAConfiguration#ATLAS_SERVER_IDS} key
-     * that has a host:port entry for the key {@link HAConfiguration#ATLAS_SERVER_ADDRESS_PREFIX}+ID where
-     * the host is a local IP address and port is set in the system property
-     * {@link AtlasConstants#SYSTEM_PROPERTY_APP_PORT}.
-     *
-     * @param configuration
-     * @return
-     * @throws AtlasException if no ID is found that maps to a local IP Address or port
-     */
-    public static String getAtlasServerId(Configuration configuration) throws AtlasException {
-        // ids are already trimmed by this method
-        String[] ids = configuration.getStringArray(ATLAS_SERVER_IDS);
-        String matchingServerId = null;
-        int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT));
-        for (String id : ids) {
-            String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +id);
-            if (!StringUtils.isEmpty(hostPort)) {
-                InetSocketAddress socketAddress;
-                try {
-                    socketAddress = NetUtils.createSocketAddr(hostPort);
-                } catch (Exception e) {
-                    LOG.warn("Exception while trying to get socket address for " + hostPort, e);
-                    continue;
-                }
-                if (!socketAddress.isUnresolved()
-                        && NetUtils.isLocalAddress(socketAddress.getAddress())
-                        && appPort == socketAddress.getPort()) {
-                    LOG.info("Found matched server id " + id + " with host port: " + hostPort);
-                    matchingServerId = id;
-                    break;
-                }
-            } else {
-                LOG.info("Could not find matching address entry for id: " + id);
-            }
-        }
-        if (matchingServerId == null) {
-            String msg = String.format("Could not find server id for this instance. " +
-                    "Unable to find IDs matching any local host and port binding among %s",
-                    StringUtils.join(ids, ","));
-            throw new AtlasException(msg);
-        }
-        return matchingServerId;
-    }
-
-    /**
-     * Get the web server address that a server instance with the passed ID is bound to.
-     *
-     * This method uses the property {@link SecurityProperties#TLS_ENABLED} to determine whether
-     * the URL is http or https.
-     *
-     * @param configuration underlying configuration
-     * @param serverId serverId whose host:port property is picked to build the web server address.
-     * @return
-     */
-    public static String getBoundAddressForId(Configuration configuration, String serverId) {
-        String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +serverId);
-        boolean isSecure = configuration.getBoolean(SecurityProperties.TLS_ENABLED);
-        String protocol = (isSecure) ? "https://" : "http://";
-        return protocol + hostPort;
-    }
-
-    /**
-     * A collection of Zookeeper specific configuration that is used by High Availability code
-     */
-    public static class ZookeeperProperties {
-        private String connectString;
-        private int retriesSleepTimeMillis;
-        private int numRetries;
-        private int sessionTimeout;
-
-        public ZookeeperProperties(String connectString, int retriesSleepTimeMillis, int numRetries,
-                                   int sessionTimeout) {
-            this.connectString = connectString;
-            this.retriesSleepTimeMillis = retriesSleepTimeMillis;
-            this.numRetries = numRetries;
-            this.sessionTimeout = sessionTimeout;
-        }
-
-        public String getConnectString() {
-            return connectString;
-        }
-
-        public int getRetriesSleepTimeMillis() {
-            return retriesSleepTimeMillis;
-        }
-
-        public int getNumRetries() {
-            return numRetries;
-        }
-
-        public int getSessionTimeout() {
-            return sessionTimeout;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            ZookeeperProperties that = (ZookeeperProperties) o;
-
-            if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) return false;
-            if (numRetries != that.numRetries) return false;
-            if (sessionTimeout != that.sessionTimeout) return false;
-            return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null);
-
-        }
-
-        @Override
-        public int hashCode() {
-            int result = connectString != null ? connectString.hashCode() : 0;
-            result = 31 * result + retriesSleepTimeMillis;
-            result = 31 * result + numRetries;
-            result = 31 * result + sessionTimeout;
-            return result;
-        }
-    }
-
-    public static ZookeeperProperties getZookeeperProperties(Configuration configuration) {
-        String zookeeperConnectString = configuration.getString("atlas.kafka.zookeeper.connect");
-        if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) {
-            zookeeperConnectString = configuration.getString(HA_ZOOKEEPER_CONNECT);
-        }
-
-        int retriesSleepTimeMillis = configuration.getInt(HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS,
-                DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS);
-
-        int numRetries = configuration.getInt(HA_ZOOKEEPER_NUM_RETRIES, DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES);
-
-        int sessionTimeout = configuration.getInt(HA_ZOOKEEPER_SESSION_TIMEOUT_MS,
-                DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
-        return new ZookeeperProperties(zookeeperConnectString, retriesSleepTimeMillis, numRetries, sessionTimeout);
-    }
-}


Mime
View raw message