atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [2/2] incubator-atlas git commit: ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags)
Date Fri, 01 Apr 2016 05:38:50 GMT
ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/8bde666b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/8bde666b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/8bde666b

Branch: refs/heads/master
Commit: 8bde666ba1986f5b6c9e20cad82d6037a6739db9
Parents: bca454e
Author: Shwetha GS <sshivalingamurthy@hortonworks.com>
Authored: Fri Apr 1 11:08:39 2016 +0530
Committer: Shwetha GS <sshivalingamurthy@hortonworks.com>
Committed: Fri Apr 1 11:08:39 2016 +0530

----------------------------------------------------------------------
 addons/falcon-bridge/pom.xml                    |   5 +
 addons/hive-bridge/pom.xml                      |   5 +
 addons/sqoop-bridge/pom.xml                     |   5 +
 addons/storm-bridge/pom.xml                     |   5 +
 .../main/java/org/apache/atlas/AtlasClient.java |   6 +
 .../java/org/apache/atlas/AtlasClientTest.java  |  30 ++
 .../java/org/apache/atlas/AtlasConstants.java   |   2 +
 distro/src/conf/atlas-application.properties    |  11 +-
 .../apache/atlas/kafka/KafkaNotification.java   |   4 +
 .../notification/AbstractNotification.java      |   7 +
 .../notification/NotificationHookConsumer.java  | 100 ++++-
 .../NotificationHookConsumerTest.java           |  95 ++++-
 release-log.txt                                 |   1 +
 .../audit/HBaseBasedAuditRepository.java        |  39 +-
 .../graph/GraphBackedSearchIndexer.java         |  41 ++-
 .../atlas/services/DefaultMetadataService.java  |  62 +++-
 .../audit/HBaseBasedAuditRepositoryHATest.java  |  94 +++++
 .../graph/GraphBackedSearchIndexerTest.java     |  94 +++++
 .../DefaultMetadataServiceMockTest.java         | 104 +++++-
 server-api/pom.xml                              |  13 +
 .../org/apache/atlas/ha/HAConfiguration.java    | 196 ++++++++++
 .../listener/ActiveStateChangeHandler.java      |  49 +++
 .../apache/atlas/ha/HAConfigurationTest.java    |  90 +++++
 .../atlas/typesystem/types/TypeSystem.java      |  10 +-
 .../main/resources/atlas-application.properties |   6 +-
 .../atlas/typesystem/types/TypeSystemTest.java  |  47 ++-
 .../src/main/java/org/apache/atlas/Atlas.java   |   8 +-
 .../atlas/web/filters/ActiveServerFilter.java   | 139 +++++++
 .../atlas/web/listeners/GuiceServletConfig.java |  47 ++-
 .../service/ActiveInstanceElectorModule.java    |  49 +++
 .../service/ActiveInstanceElectorService.java   | 197 ++++++++++
 .../atlas/web/service/ActiveInstanceState.java  | 109 ++++++
 .../atlas/web/service/CuratorFactory.java       |  94 +++++
 .../apache/atlas/web/service/ServiceState.java  |  96 +++++
 .../web/filters/ActiveServerFilterTest.java     | 172 +++++++++
 .../ActiveInstanceElectorServiceTest.java       | 364 +++++++++++++++++++
 .../web/service/ActiveInstanceStateTest.java    | 137 +++++++
 .../atlas/web/service/ServiceStateTest.java     |  67 ++++
 38 files changed, 2530 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/addons/falcon-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index ad345c5..afbc150 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -151,6 +151,11 @@
                                     <version>${project.version}</version>
                                 </artifactItem>
                                 <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    <artifactId>atlas-server-api</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                                <artifactItem>
                                     <groupId>org.scala-lang</groupId>
                                     <artifactId>scala-compiler</artifactId>
                                     <version>${scala.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index 8bfbb13..720b6d1 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -229,6 +229,11 @@
                                     <version>${project.version}</version>
                                 </artifactItem>
                                 <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    <artifactId>atlas-server-api</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                                <artifactItem>
                                     <groupId>org.scala-lang</groupId>
                                     <artifactId>scala-compiler</artifactId>
                                     <version>${scala.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/addons/sqoop-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index 343bb4e..4b5dbb1 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -234,6 +234,11 @@
                                     <version>${project.version}</version>
                                 </artifactItem>
                                 <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    <artifactId>atlas-server-api</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                                <artifactItem>
                                     <groupId>org.scala-lang</groupId>
                                     <artifactId>scala-compiler</artifactId>
                                     <version>${scala.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/addons/storm-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index e3b4ed7..9efa568 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -184,6 +184,11 @@
                                     <version>${project.version}</version>
                                 </artifactItem>
                                 <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    <artifactId>atlas-server-api</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                                <artifactItem>
                                     <groupId>org.scala-lang</groupId>
                                     <artifactId>scala-compiler</artifactId>
                                     <version>${scala.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index 1fc811a..18c0569 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -145,6 +145,12 @@ public class AtlasClient {
             return true;
         } catch (ClientHandlerException che) {
             return false;
+        } catch (AtlasServiceException ase) {
+            if (ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) {
+                LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready");
+                return false;
+            }
+            throw ase;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
index 1e7eed1..6e1fbe2 100644
--- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java
+++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.fail;
 
 public class AtlasClientTest {
 
@@ -64,4 +65,33 @@ public class AtlasClientTest {
                 new ClientHandlerException());
         assertFalse(atlasClient.isServerReady());
     }
+
+    @Test
+    public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException {
+        WebResource webResource = mock(WebResource.class);
+        AtlasClient atlasClient = new AtlasClient(webResource);
+        WebResource.Builder builder = setupBuilder(webResource);
+        ClientResponse response = mock(ClientResponse.class);
+        when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
+        when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);
+
+        when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);
+
+        assertFalse(atlasClient.isServerReady());
+    }
+
+    @Test(expectedExceptions = AtlasServiceException.class)
+    public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException {
+        WebResource webResource = mock(WebResource.class);
+        AtlasClient atlasClient = new AtlasClient(webResource);
+        WebResource.Builder builder = setupBuilder(webResource);
+        ClientResponse response = mock(ClientResponse.class);
+        when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+        when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
+
+        when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);
+
+        atlasClient.isServerReady();
+        fail("Should throw exception");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/common/src/main/java/org/apache/atlas/AtlasConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/AtlasConstants.java b/common/src/main/java/org/apache/atlas/AtlasConstants.java
index 85719c9..950ed6b 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConstants.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConstants.java
@@ -28,4 +28,6 @@ public final class AtlasConstants {
     public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
     public static final String DEFAULT_CLUSTER_NAME = "primary";
     public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName";
+    public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port";
+    public static final String DEFAULT_APP_PORT_STR = "21000";
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 453435b..00c5d5a 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -95,4 +95,13 @@ atlas.http.authentication.enabled=false
 atlas.http.authentication.type=simple
 
 #########  Server Properties  #########
-atlas.rest.address=http://localhost:21000
\ No newline at end of file
+atlas.rest.address=http://localhost:21000
+
+#########  High Availability Configuration ########
+atlas.server.ha.enabled=false
+atlas.server.ids=id1
+atlas.server.address.id1=localhost:21000
+atlas.server.ha.zookeeper.connect=localhost:2181
+atlas.server.ha.zookeeper.retry.sleeptime.ms=1000
+atlas.server.ha.zookeeper.num.retries=3
+atlas.server.ha.zookeeper.session.timeout.ms=20000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 2701039..889af11 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -127,6 +127,10 @@ public class KafkaNotification extends AbstractNotification implements Service {
 
     @Override
     public void start() throws AtlasException {
+        if (isHAEnabled()) {
+            LOG.info("Not starting embedded instances when HA is enabled.");
+            return;
+        }
         if (isEmbedded()) {
             try {
                 startZk();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 885242d..596f988 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.notification;
 
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.ha.HAConfiguration;
 import org.apache.commons.configuration.Configuration;
 
 import java.util.Arrays;
@@ -30,12 +31,14 @@ public abstract class AbstractNotification implements NotificationInterface {
 
     private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
     private final boolean embedded;
+    private final boolean isHAEnabled;
 
 
     // ----- Constructors ------------------------------------------------------
 
     public AbstractNotification(Configuration applicationProperties) throws AtlasException {
         this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
+        this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties);
     }
 
 
@@ -50,6 +53,10 @@ public abstract class AbstractNotification implements NotificationInterface {
         return embedded;
     }
 
+    protected final boolean isHAEnabled() {
+        return isHAEnabled;
+    }
+
     @Override
     public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
         String[] strMessages = new String[messages.size()];

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 2fcbcd3..ca53fd2 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -23,6 +23,8 @@ import kafka.consumer.ConsumerTimeoutException;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
@@ -30,39 +32,69 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
  */
 @Singleton
-public class NotificationHookConsumer implements Service {
+public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
     private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
 
     public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
     public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
 
-    @Inject
     private NotificationInterface notificationInterface;
     private ExecutorService executors;
     private String atlasEndpoint;
+    private Configuration applicationProperties;
+    private List<HookConsumer> consumers;
+
+    @Inject
+    public NotificationHookConsumer(NotificationInterface notificationInterface) {
+        this.notificationInterface = notificationInterface;
+    }
 
     @Override
     public void start() throws AtlasException {
-        Configuration applicationProperties = ApplicationProperties.get();
+        Configuration configuration = ApplicationProperties.get();
+        startInternal(configuration, null);
+    }
 
-        atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
+    void startInternal(Configuration configuration,
+                       ExecutorService executorService) {
+        this.applicationProperties = configuration;
+        this.atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
+        if (consumers == null) {
+            consumers = new ArrayList<>();
+        }
+        if (executorService != null) {
+            executors = executorService;
+        }
+        if (!HAConfiguration.isHAEnabled(configuration)) {
+            LOG.info("HA is disabled, starting consumers inline.");
+            startConsumers(executorService);
+        }
+    }
+
+    private void startConsumers(ExecutorService executorService) {
         int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
-        List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers =
+        List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
                 notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
-        executors = Executors.newFixedThreadPool(consumers.size());
-
-        for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : consumers) {
-            executors.submit(new HookConsumer(consumer));
+        if (executorService == null) {
+            executorService = Executors.newFixedThreadPool(notificationConsumers.size());
+        }
+        executors = executorService;
+        for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
+            HookConsumer hookConsumer = new HookConsumer(consumer);
+            consumers.add(hookConsumer);
+            executors.submit(hookConsumer);
         }
     }
 
@@ -71,14 +103,52 @@ public class NotificationHookConsumer implements Service {
         //Allow for completion of outstanding work
         notificationInterface.close();
         try {
-            if (executors != null && !executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
-                LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+            if (executors != null) {
+                stopConsumerThreads();
+                executors.shutdownNow();
+                if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+                    LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+                }
+                executors = null;
             }
         } catch (InterruptedException e) {
             LOG.error("Failure in shutting down consumers");
         }
     }
 
+    private void stopConsumerThreads() {
+        if (consumers != null) {
+            for (HookConsumer consumer : consumers) {
+                consumer.stop();
+            }
+            consumers.clear();
+        }
+    }
+
+    /**
+     * Start Kafka consumer threads that read from Kafka topic when server is activated.
+     *
+     * Since the consumers create / update entities to the shared backend store, only the active instance
+     * should perform this activity. Hence, these threads are started only on server activation.
+     */
+    @Override
+    public void instanceIsActive() {
+        LOG.info("Reacting to active state: initializing Kafka consumers");
+        startConsumers(executors);
+    }
+
+    /**
+     * Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
+     *
+     * Since the consumers create / update entities to the shared backend store, only the active instance
+     * should perform this activity. Hence, these threads are stopped only on server deactivation.
+     */
+    @Override
+    public void instanceIsPassive() {
+        LOG.info("Reacting to passive state: shutting down Kafka consumers.");
+        stop();
+    }
+
     static class Timer {
         public void sleep(int interval) throws InterruptedException {
             Thread.sleep(interval);
@@ -87,6 +157,7 @@ public class NotificationHookConsumer implements Service {
 
     class HookConsumer implements Runnable {
         private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
+        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
 
         public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
             this.consumer = consumer;
@@ -102,12 +173,13 @@ public class NotificationHookConsumer implements Service {
 
         @Override
         public void run() {
+            shouldRun.set(true);
 
             if (!serverAvailable(new NotificationHookConsumer.Timer())) {
                 return;
             }
 
-            while (true) {
+            while (shouldRun.get()) {
                 try {
                     if (hasNext()) {
                         HookNotification.HookNotificationMessage message = consumer.next();
@@ -177,5 +249,9 @@ public class NotificationHookConsumer implements Service {
             LOG.info("Atlas Server is ready, can start reading Kafka events.");
             return true;
         }
+
+        public void stop() {
+            shouldRun.set(false);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 02255a7..177de6d 100644
--- a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -20,18 +20,43 @@ package org.apache.atlas.notification;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
 import static org.mockito.Mockito.*;
 import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertTrue;
 
 public class NotificationHookConsumerTest {
 
+    @Mock
+    private NotificationInterface notificationInterface;
+
+    @Mock
+    private AtlasClient atlasClient;
+
+    @Mock
+    private Configuration configuration;
+
+    @Mock
+    private ExecutorService executorService;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
     @Test
     public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
-        final AtlasClient atlasClient = mock(AtlasClient.class);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
         NotificationHookConsumer.HookConsumer hookConsumer =
                 notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
                     @Override
@@ -49,8 +74,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
-        final AtlasClient atlasClient = mock(AtlasClient.class);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
         NotificationHookConsumer.HookConsumer hookConsumer =
                 notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
                     @Override
@@ -68,8 +92,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
-        final AtlasClient atlasClient = mock(AtlasClient.class);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
         NotificationHookConsumer.HookConsumer hookConsumer =
                 notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
                     @Override
@@ -86,8 +109,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
-        final AtlasClient atlasClient = mock(AtlasClient.class);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
         NotificationHookConsumer.HookConsumer hookConsumer =
                 notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
                     @Override
@@ -101,4 +123,61 @@ public class NotificationHookConsumerTest {
 
         assertFalse(hookConsumer.serverAvailable(timer));
     }
+
+    @Test
+    public void testConsumersStartedIfHAIsDisabled() {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
+        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        consumers.add(mock(NotificationConsumer.class));
+        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+                thenReturn(consumers);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
+        notificationHookConsumer.startInternal(configuration, executorService);
+        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+        verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
+    }
+
+    @Test
+    public void testConsumersAreNotStartedIfHAIsEnabled() {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        consumers.add(mock(NotificationConsumer.class));
+        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+                thenReturn(consumers);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
+        notificationHookConsumer.startInternal(configuration, executorService);
+        verifyZeroInteractions(notificationInterface);
+    }
+
+    @Test
+    public void testConsumersAreStartedWhenInstanceBecomesActive() {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        consumers.add(mock(NotificationConsumer.class));
+        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+                thenReturn(consumers);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
+        notificationHookConsumer.startInternal(configuration, executorService);
+        notificationHookConsumer.instanceIsActive();
+        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+        verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
+    }
+
+    @Test
+    public void testConsumersAreStoppedWhenInstanceBecomesPassive() {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        consumers.add(mock(NotificationConsumer.class));
+        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+                thenReturn(consumers);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
+        notificationHookConsumer.startInternal(configuration, executorService);
+        notificationHookConsumer.instanceIsPassive();
+        verify(notificationInterface).close();
+        verify(executorService).shutdownNow();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index aaef9e3..87e39e6 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags)
 ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
 ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags)
 ATLAS-575 jetty-maven-plugin fails with ShutdownMonitorThread already started (shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index ae6e988..c4329a5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -18,8 +18,12 @@
 
 package org.apache.atlas.repository.audit;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Singleton;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
@@ -59,7 +63,8 @@ import java.util.List;
  * and only 1 version is kept, there can be just 1 audit event per entity id + timestamp. This is ok for one atlas server.
  * But if there are more than one atlas servers, we should use server id in the key
  */
-public class HBaseBasedAuditRepository implements Service, EntityAuditRepository {
+@Singleton
+public class HBaseBasedAuditRepository implements Service, EntityAuditRepository, ActiveStateChangeHandler {
     private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class);
 
     public static final String CONFIG_PREFIX = "atlas.audit";
@@ -237,23 +242,47 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
 
     @Override
     public void start() throws AtlasException {
-        Configuration atlasConf = ApplicationProperties.get();
+        Configuration configuration = ApplicationProperties.get();
+        startInternal(configuration, getHBaseConfiguration(configuration));
+    }
+
+    @VisibleForTesting
+    void startInternal(Configuration atlasConf,
+                                 org.apache.hadoop.conf.Configuration hbaseConf) throws AtlasException {
 
         String tableNameStr = atlasConf.getString(CONFIG_TABLE_NAME, DEFAULT_TABLE_NAME);
         tableName = TableName.valueOf(tableNameStr);
 
         try {
-            org.apache.hadoop.conf.Configuration hbaseConf = getHBaseConfiguration(atlasConf);
-            connection = ConnectionFactory.createConnection(hbaseConf);
+            connection = createConnection(hbaseConf);
         } catch (IOException e) {
             throw new AtlasException(e);
         }
 
-        createTableIfNotExists();
+        if (!HAConfiguration.isHAEnabled(atlasConf)) {
+            LOG.info("HA is disabled. Hence creating table on startup.");
+            createTableIfNotExists();
+        }
+    }
+
+    @VisibleForTesting
+    protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) throws IOException {
+        return ConnectionFactory.createConnection(hbaseConf);
     }
 
     @Override
     public void stop() throws AtlasException {
         close(connection);
     }
+
+    @Override
+    public void instanceIsActive() throws AtlasException {
+        LOG.info("Reacting to active: Creating HBase table for Audit if required.");
+        createTableIfNotExists();
+    }
+
+    @Override
+    public void instanceIsPassive() {
+        LOG.info("Reacting to passive: No action for now.");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 7eccc58..e7e8fb9 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -26,12 +26,15 @@ import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
 import com.thinkaurelius.titan.core.schema.TitanManagement;
 import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Vertex;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.discovery.SearchIndexer;
+import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.IndexCreationException;
 import org.apache.atlas.repository.IndexException;
 import org.apache.atlas.repository.RepositoryException;
+import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.typesystem.types.AttributeInfo;
 import org.apache.atlas.typesystem.types.ClassType;
 import org.apache.atlas.typesystem.types.DataTypes;
@@ -39,6 +42,7 @@ import org.apache.atlas.typesystem.types.IDataType;
 import org.apache.atlas.typesystem.types.Multiplicity;
 import org.apache.atlas.typesystem.types.StructType;
 import org.apache.atlas.typesystem.types.TraitType;
+import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +57,7 @@ import java.util.Map;
 /**
  * Adds index for properties of a given type when its added before any instances are added.
  */
-public class GraphBackedSearchIndexer implements SearchIndexer {
+public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChangeHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class);
 
@@ -67,13 +71,16 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
 
     @Inject
     public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException,
-            IndexException {
+            AtlasException {
+        this(graphProvider, ApplicationProperties.get());
+    }
 
+    GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider, Configuration configuration)
+            throws IndexException, RepositoryException {
         this.titanGraph = graphProvider.get();
-
-        /* Create the transaction for indexing.
-         */
-        initialize();
+        if (!HAConfiguration.isHAEnabled(configuration)) {
+            initialize();
+        }
     }
 
     /**
@@ -355,6 +362,28 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
         }
     }
 
+    /**
+     * Initialize global indices for Titan graph on server activation.
+     *
+     * Since the indices are shared state, we need to do this only from an active instance.
+     */
+    @Override
+    public void instanceIsActive() throws AtlasException {
+        LOG.info("Reacting to active: initializing index");
+        try {
+            initialize();
+        } catch (RepositoryException e) {
+            throw new AtlasException("Error in reacting to active on initialization", e);
+        } catch (IndexException e) {
+            throw new AtlasException("Error in reacting to active on initialization", e);
+        }
+    }
+
+    @Override
+    public void instanceIsPassive() {
+        LOG.info("Reacting to passive state: No action right now.");
+    }
+
     /* Commenting this out since we do not need an index for edge label here
     private void createEdgeMixedIndex(String propertyName) {
         EdgeLabel edgeLabel = management.getEdgeLabel(propertyName);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index 40728bc..cd1161a 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -22,9 +22,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Provider;
+
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.classification.InterfaceAudience;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.listener.TypesChangeListener;
 import org.apache.atlas.repository.MetadataRepository;
@@ -58,6 +62,7 @@ import org.apache.atlas.typesystem.types.TypeUtils.Pair;
 import org.apache.atlas.typesystem.types.ValueConversionException;
 import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.utils.ParamChecker;
+import org.apache.commons.configuration.Configuration;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -71,13 +76,14 @@ import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Simple wrapper over TypeSystem and MetadataRepository services with hooks
  * for listening to changes to the repository.
  */
 @Singleton
-public class DefaultMetadataService implements MetadataService {
+public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class);
 
@@ -89,6 +95,8 @@ public class DefaultMetadataService implements MetadataService {
     private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>();
     private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
 
+    private boolean wasInitialized = false;
+
     @Inject
     DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
                            final IBootstrapTypesRegistrar typesRegistrar,
@@ -96,14 +104,15 @@ public class DefaultMetadataService implements MetadataService {
                            final Collection<Provider<EntityChangeListener>> entityListenerProviders)
             throws AtlasException {
         this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders,
-                TypeSystem.getInstance());
+                TypeSystem.getInstance(), ApplicationProperties.get());
     }
 
     DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
                            final IBootstrapTypesRegistrar typesRegistrar,
                            final Collection<Provider<TypesChangeListener>> typeListenerProviders,
                            final Collection<Provider<EntityChangeListener>> entityListenerProviders,
-                           final TypeSystem typeSystem) throws AtlasException {
+                           final TypeSystem typeSystem,
+                           final Configuration configuration) throws AtlasException {
         this.typeStore = typeStore;
         this.typesRegistrar = typesRegistrar;
         this.typeSystem = typeSystem;
@@ -117,25 +126,37 @@ public class DefaultMetadataService implements MetadataService {
             entityChangeListeners.add(provider.get());
         }
 
-        restoreTypeSystem();
-
-        typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this);
+        if (!HAConfiguration.isHAEnabled(configuration)) {
+            restoreTypeSystem();
+        }
     }
 
-    private void restoreTypeSystem() {
+    private void restoreTypeSystem() throws AtlasException {
         LOG.info("Restoring type system from the store");
-        try {
-            TypesDef typesDef = typeStore.restore();
+        TypesDef typesDef = typeStore.restore();
+        if (!wasInitialized) {
+            LOG.info("Initializing type system for the first time.");
             typeSystem.defineTypes(typesDef);
 
             // restore types before creating super types
             createSuperTypes();
-        } catch (AtlasException e) {
-            throw new RuntimeException(e);
+            typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this);
+            wasInitialized = true;
+        } else {
+            LOG.info("Type system was already initialized, refreshing cache.");
+            refreshCache(typesDef);
         }
         LOG.info("Restored type system from the store");
     }
 
+    private void refreshCache(TypesDef typesDef) throws AtlasException {
+        TypeSystem.TransientTypeSystem transientTypeSystem
+                = typeSystem.createTransientTypeSystem(typesDef, true);
+        Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded();
+        LOG.info("Number of types got from transient type system: " + typesAdded.size());
+        typeSystem.commitTypes(typesAdded);
+    }
+
     private static final AttributeDefinition NAME_ATTRIBUTE =
             TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE);
     private static final AttributeDefinition DESCRIPTION_ATTRIBUTE =
@@ -683,4 +704,23 @@ public class DefaultMetadataService implements MetadataService {
             listener.onEntitiesDeleted(entities);
         }
     }
+
+    /**
+     * Create or restore the {@link TypeSystem} cache on server activation.
+     *
+     * When an instance is passive, types could be created outside of its cache by the active instance.
+     * Hence, when this instance becomes active, it needs to restore the cache from the backend store.
+     * The first time initialization happens, the indices for these types also needs to be created.
+     * This must happen only from the active instance, as it updates shared backend state.
+     */
+    @Override
+    public void instanceIsActive() throws AtlasException {
+        LOG.info("Reacting to active state: restoring type system");
+        restoreTypeSystem();
+    }
+
+    @Override
+    public void instanceIsPassive() {
+        LOG.info("Reacting to passive state: no action right now");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java
new file mode 100644
index 0000000..2f7edb4
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class HBaseBasedAuditRepositoryHATest {
+
+    @Mock
+    private Configuration configuration;
+
+    @Mock
+    private org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Mock
+    private Connection connection;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testTableShouldNotBeCreatedOnStartIfHAIsEnabled() throws IOException, AtlasException {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(configuration.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME,
+                HBaseBasedAuditRepository.DEFAULT_TABLE_NAME)).
+                thenReturn(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME);
+        HBaseBasedAuditRepository auditRepository = new HBaseBasedAuditRepository() {
+            @Override
+            protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) {
+                return connection;
+            }
+        };
+        auditRepository.startInternal(configuration, hbaseConf);
+
+        verifyZeroInteractions(connection);
+    }
+
+    @Test
+    public void testShouldCreateTableWhenReactingToActive() throws AtlasException, IOException {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(configuration.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME,
+                HBaseBasedAuditRepository.DEFAULT_TABLE_NAME)).
+                thenReturn(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME);
+        TableName tableName = TableName.valueOf(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME);
+        Admin admin = mock(Admin.class);
+        when(connection.getAdmin()).thenReturn(admin);
+        when(admin.tableExists(tableName)).thenReturn(true);
+        HBaseBasedAuditRepository auditRepository = new HBaseBasedAuditRepository() {
+            @Override
+            protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) {
+                return connection;
+            }
+        };
+        auditRepository.startInternal(configuration, hbaseConf);
+        auditRepository.instanceIsActive();
+
+        verify(connection).getAdmin();
+        verify(admin).tableExists(tableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java
new file mode 100644
index 0000000..87fdf87
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graph;
+
+import com.thinkaurelius.titan.core.TitanGraph;
+import com.thinkaurelius.titan.core.schema.TitanManagement;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.IndexException;
+import org.apache.atlas.repository.RepositoryException;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class GraphBackedSearchIndexerTest {
+
+    @Mock
+    private Configuration configuration;
+
+    @Mock
+    private GraphProvider<TitanGraph> graphProvider;
+
+    @Mock
+    private TitanGraph titanGraph;
+
+    @Mock
+    private TitanManagement titanManagement;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testSearchIndicesAreInitializedOnConstructionWhenHAIsDisabled() throws IndexException, RepositoryException {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
+        when(graphProvider.get()).thenReturn(titanGraph);
+        when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
+        when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
+
+        GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration);
+
+        verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY);
+    }
+
+    @Test
+    public void testSearchIndicesAreNotInitializedOnConstructionWhenHAIsEnabled() throws IndexException, RepositoryException {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(graphProvider.get()).thenReturn(titanGraph);
+        when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
+        when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
+
+        GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration);
+
+        verifyZeroInteractions(titanManagement);
+
+    }
+
+    @Test
+    public void testIndicesAreReinitializedWhenServerBecomesActive() throws AtlasException {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(graphProvider.get()).thenReturn(titanGraph);
+        when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
+        when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
+
+        GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration);
+        graphBackedSearchIndexer.instanceIsActive();
+
+        verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
index 0685e19..effee2a 100644
--- a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
@@ -25,28 +25,126 @@ import org.apache.atlas.listener.TypesChangeListener;
 import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.typestore.ITypeStore;
 import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.listener.TypesChangeListener;
+import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.typestore.ITypeStore;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.types.IDataType;
+import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 public class DefaultMetadataServiceMockTest {
 
+    @Mock
+    private IBootstrapTypesRegistrar typesRegistrar;
+
+    @Mock
+    private TypeSystem typeSystem;
+
+    @Mock
+    private MetadataRepository metadataRepository;
+
+    @Mock
+    private ITypeStore typeStore;
+
+    @Mock
+    private Configuration configuration;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
     @Test
     public void testShouldInvokeTypesRegistrarOnCreation() throws AtlasException {
-        IBootstrapTypesRegistrar typesRegistrar = mock(IBootstrapTypesRegistrar.class);
-        TypeSystem typeSystem = mock(TypeSystem.class);
         when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
         DefaultMetadataService defaultMetadataService = new DefaultMetadataService(mock(MetadataRepository.class),
                 mock(ITypeStore.class),
                 typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
-                new ArrayList<Provider<EntityChangeListener>>(), typeSystem);
+                new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration);
+
+        verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(),
+                typeSystem, defaultMetadataService);
+    }
+
+    @Test
+    public void testShouldNotRestoreTypesIfHAIsEnabled() throws AtlasException {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+
+        DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository,
+                typeStore,
+                typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
+                new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration);
+
+        verifyZeroInteractions(typeStore);
+        verifyZeroInteractions(typeSystem);
+        verifyZeroInteractions(typesRegistrar);
+    }
+
+    @Test
+    public void testShouldRestoreTypeSystemOnServerActive() throws AtlasException {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+
+        TypesDef typesDef = mock(TypesDef.class);
+        when(typeStore.restore()).thenReturn(typesDef);
+        when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
+
+        DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository,
+                typeStore,
+                typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
+                new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration);
+        defaultMetadataService.instanceIsActive();
 
+        verify(typeStore).restore();
+        verify(typeSystem).defineTypes(typesDef);
         verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(),
                 typeSystem, defaultMetadataService);
     }
+
+    @Test
+    public void testShouldOnlyRestoreCacheOnServerActiveIfAlreadyDoneOnce() throws AtlasException {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+
+        TypesDef typesDef = mock(TypesDef.class);
+        when(typeStore.restore()).thenReturn(typesDef);
+        when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
+
+        TypeSystem.TransientTypeSystem transientTypeSystem = mock(TypeSystem.TransientTypeSystem.class);
+        HashMap<String, IDataType> typesAdded = new HashMap<>();
+        when(transientTypeSystem.getTypesAdded()).thenReturn(typesAdded);
+        when(typeSystem.createTransientTypeSystem(typesDef, true)).
+                thenReturn(transientTypeSystem);
+        DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository,
+                typeStore,
+                typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
+                new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration);
+
+        defaultMetadataService.instanceIsActive();
+        defaultMetadataService.instanceIsPassive();
+        defaultMetadataService.instanceIsActive();
+
+        verify(typeStore, times(2)).restore();
+        verify(typeSystem, times(1)).defineTypes(typesDef);
+        verify(typesRegistrar, times(1)).
+                registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, defaultMetadataService);
+        verify(typeSystem, times(1)).createTransientTypeSystem(typesDef, true);
+        verify(typeSystem, times(1)).commitTypes(typesAdded);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/server-api/pom.xml
----------------------------------------------------------------------
diff --git a/server-api/pom.xml b/server-api/pom.xml
index 93a0358..d3e84c4 100644
--- a/server-api/pom.xml
+++ b/server-api/pom.xml
@@ -47,6 +47,19 @@
             <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-typesystem</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java b/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java
new file mode 100644
index 0000000..06977c5
--- /dev/null
+++ b/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.ha;
+
+import org.apache.atlas.AtlasConstants;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.security.SecurityProperties;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A wrapper for getting configuration entries related to HighAvailability.
+ */
+public class HAConfiguration {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class);
+
+    public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha";
+    public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + ".enabled";
+    public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address.";
+    public static final String ATLAS_SERVER_IDS = "atlas.server.ids";
+    public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + ".zookeeper.connect";
+    public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000;
+    public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.retry.sleeptime.ms";
+    public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + ".zookeeper.num.retries";
+    public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3;
+    public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.session.timeout.ms";
+    public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000;
+
+    /**
+     * Return whether HA is enabled or not.
+     * @param configuration underlying configuration instance
+     * @return
+     */
+    public static boolean isHAEnabled(Configuration configuration) {
+        return configuration.getBoolean(ATLAS_SERVER_HA_ENABLED_KEY, false);
+    }
+
+    /**
+     * Return the ID corresponding to this Atlas instance.
+     *
+     * The match is done by looking for an ID configured in {@link HAConfiguration#ATLAS_SERVER_IDS} key
+     * that has a host:port entry for the key {@link HAConfiguration#ATLAS_SERVER_ADDRESS_PREFIX}+ID where
+     * the host is a local IP address and port is set in the system property
+     * {@link AtlasConstants#SYSTEM_PROPERTY_APP_PORT}.
+     *
+     * @param configuration
+     * @return
+     * @throws AtlasException if no ID is found that maps to a local IP Address or port
+     */
+    public static String getAtlasServerId(Configuration configuration) throws AtlasException {
+        // ids are already trimmed by this method
+        String[] ids = configuration.getStringArray(ATLAS_SERVER_IDS);
+        String matchingServerId = null;
+        int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT));
+        for (String id : ids) {
+            String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +id);
+            if (!StringUtils.isEmpty(hostPort)) {
+                InetSocketAddress socketAddress;
+                try {
+                    socketAddress = NetUtils.createSocketAddr(hostPort);
+                } catch (Exception e) {
+                    LOG.warn("Exception while trying to get socket address for " + hostPort, e);
+                    continue;
+                }
+                if (!socketAddress.isUnresolved()
+                        && NetUtils.isLocalAddress(socketAddress.getAddress())
+                        && appPort == socketAddress.getPort()) {
+                    LOG.info("Found matched server id " + id + " with host port: " + hostPort);
+                    matchingServerId = id;
+                    break;
+                }
+            } else {
+                LOG.info("Could not find matching address entry for id: " + id);
+            }
+        }
+        if (matchingServerId == null) {
+            String msg = String.format("Could not find server id for this instance. " +
+                    "Unable to find IDs matching any local host and port binding among %s",
+                    StringUtils.join(ids, ","));
+            throw new AtlasException(msg);
+        }
+        return matchingServerId;
+    }
+
+    /**
+     * Get the web server address that a server instance with the passed ID is bound to.
+     *
+     * This method uses the property {@link SecurityProperties#TLS_ENABLED} to determine whether
+     * the URL is http or https.
+     *
+     * @param configuration underlying configuration
+     * @param serverId serverId whose host:port property is picked to build the web server address.
+     * @return
+     */
+    public static String getBoundAddressForId(Configuration configuration, String serverId) {
+        String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +serverId);
+        boolean isSecure = configuration.getBoolean(SecurityProperties.TLS_ENABLED);
+        String protocol = (isSecure) ? "https://" : "http://";
+        return protocol + hostPort;
+    }
+
+    /**
+     * A collection of Zookeeper specific configuration that is used by High Availability code
+     */
+    public static class ZookeeperProperties {
+        private String connectString;
+        private int retriesSleepTimeMillis;
+        private int numRetries;
+        private int sessionTimeout;
+
+        public ZookeeperProperties(String connectString, int retriesSleepTimeMillis, int numRetries,
+                                   int sessionTimeout) {
+            this.connectString = connectString;
+            this.retriesSleepTimeMillis = retriesSleepTimeMillis;
+            this.numRetries = numRetries;
+            this.sessionTimeout = sessionTimeout;
+        }
+
+        public String getConnectString() {
+            return connectString;
+        }
+
+        public int getRetriesSleepTimeMillis() {
+            return retriesSleepTimeMillis;
+        }
+
+        public int getNumRetries() {
+            return numRetries;
+        }
+
+        public int getSessionTimeout() {
+            return sessionTimeout;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            ZookeeperProperties that = (ZookeeperProperties) o;
+
+            if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) return false;
+            if (numRetries != that.numRetries) return false;
+            if (sessionTimeout != that.sessionTimeout) return false;
+            return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null);
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = connectString != null ? connectString.hashCode() : 0;
+            result = 31 * result + retriesSleepTimeMillis;
+            result = 31 * result + numRetries;
+            result = 31 * result + sessionTimeout;
+            return result;
+        }
+    }
+
+    public static ZookeeperProperties getZookeeperProperties(Configuration configuration) {
+        String zookeeperConnectString = configuration.getString("atlas.kafka.zookeeper.connect");
+        if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) {
+            zookeeperConnectString = configuration.getString(HA_ZOOKEEPER_CONNECT);
+        }
+
+        int retriesSleepTimeMillis = configuration.getInt(HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS,
+                DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS);
+
+        int numRetries = configuration.getInt(HA_ZOOKEEPER_NUM_RETRIES, DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES);
+
+        int sessionTimeout = configuration.getInt(HA_ZOOKEEPER_SESSION_TIMEOUT_MS,
+                DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
+        return new ZookeeperProperties(zookeeperConnectString, retriesSleepTimeMillis, numRetries, sessionTimeout);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
new file mode 100644
index 0000000..87a69ef
--- /dev/null
+++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.listener;
+
+import org.apache.atlas.AtlasException;
+
+/**
+ * An interface that should be implemented by objects and services to react to changes in state of an Atlas server.
+ *
+ * The two state transitions we handle are (1) becoming active and (2) becoming passive.
+ */
+public interface ActiveStateChangeHandler {
+
+    /**
+     * Callback that is invoked on an implementor when this instance of Atlas server is declared the leader.
+     *
+     * Any initialization that must be carried out by an implementor only when the server becomes active
+     * should happen on this callback.
+     *
+     * @throws {@link AtlasException} if anything is wrong on initialization
+     */
+    void instanceIsActive() throws AtlasException;
+
+    /**
+     * Callback that is invoked on an implementor when this instance of Atlas server is removed as the leader.
+     *
+     * Any cleanup that must be carried out by an implementor when the server becomes passive
+     * should happen on this callback.
+     *
+     * @throws {@link AtlasException} if anything is wrong on shutdown
+     */
+    void instanceIsPassive() throws AtlasException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java b/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java
new file mode 100644
index 0000000..a7c9f37
--- /dev/null
+++ b/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.ha;
+
+import org.apache.atlas.AtlasConstants;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.security.SecurityProperties;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+public class HAConfigurationTest {
+
+    @Mock
+    private Configuration configuration;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+        System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, AtlasConstants.DEFAULT_APP_PORT_STR);
+    }
+
+    @Test
+    public void testShouldSelectRightServerAddress() throws AtlasException {
+        when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"});
+        when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000");
+        when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id2")).thenReturn("127.0.0.1:21000");
+
+        String atlasServerId = HAConfiguration.getAtlasServerId(configuration);
+        assertEquals(atlasServerId, "id2");
+    }
+
+    @Test(expectedExceptions = AtlasException.class)
+    public void testShouldFailIfNoIDsConfiguration() throws AtlasException {
+        when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {});
+        HAConfiguration.getAtlasServerId(configuration);
+        fail("Should not return any server id if IDs not found in configuration");
+    }
+
+    @Test(expectedExceptions = AtlasException.class)
+    public void testShouldFailIfNoMatchingAddressForID() throws AtlasException {
+        when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"});
+        when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000");
+
+        HAConfiguration.getAtlasServerId(configuration);
+        fail("Should not return any server id if no matching address found for any ID");
+    }
+
+    @Test
+    public void testShouldReturnHTTPBoundAddress() {
+        when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
+        when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(false);
+
+        String address = HAConfiguration.getBoundAddressForId(configuration, "id1");
+
+        assertEquals(address, "http://127.0.0.1:21000");
+    }
+
+    @Test
+    public void testShouldReturnHTTPSBoundAddress() {
+        when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21443");
+        when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(true);
+
+        String address = HAConfiguration.getBoundAddressForId(configuration, "id1");
+
+        assertEquals(address, "https://127.0.0.1:21443");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
index b41f3db..402800e 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
@@ -27,11 +27,14 @@ import org.apache.atlas.classification.InterfaceAudience;
 import org.apache.atlas.typesystem.TypesDef;
 import org.apache.atlas.typesystem.exception.TypeExistsException;
 import org.apache.atlas.typesystem.exception.TypeNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.inject.Singleton;
 import java.lang.reflect.Constructor;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -43,6 +46,8 @@ import java.util.concurrent.ConcurrentHashMap;
 @Singleton
 @InterfaceAudience.Private
 public class TypeSystem {
+    private static final Logger LOG = LoggerFactory.getLogger(TypeSystem.class);
+
     private static final TypeSystem INSTANCE = new TypeSystem();
     private static ThreadLocal<SimpleDateFormat> dateFormat = new ThreadLocal<SimpleDateFormat>() {
         @Override
@@ -333,7 +338,10 @@ public class TypeSystem {
             IDataType type = typeEntry.getValue();
             //Add/replace the new type in the typesystem
             types.put(typeName, type);
-            typeCategoriesToTypeNamesMap.put(type.getTypeCategory(), typeName);
+            // ArrayListMultiMap allows duplicates - we want to avoid this during re-activation.
+            if (!typeCategoriesToTypeNamesMap.containsEntry(type.getTypeCategory(), typeName)) {
+                typeCategoriesToTypeNamesMap.put(type.getTypeCategory(), typeName);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/typesystem/src/main/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties
index 9a32e04..f753785 100644
--- a/typesystem/src/main/resources/atlas-application.properties
+++ b/typesystem/src/main/resources/atlas-application.properties
@@ -87,4 +87,8 @@ atlas.server.https.port=31443
 
 hbase.security.authentication=simple
 
-atlas.hook.falcon.synchronous=true
\ No newline at end of file
+atlas.hook.falcon.synchronous=true
+#########  High Availability Configuration ########
+atlas.server.ha.enabled=false
+atlas.server.ids=id1
+atlas.server.address.id1=localhost:21000

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java
----------------------------------------------------------------------
diff --git a/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java b/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java
index f9f5f21..a3be4c5 100755
--- a/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java
+++ b/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java
@@ -32,12 +32,14 @@ import org.testng.annotations.Test;
 import scala.actors.threadpool.Arrays;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef;
 import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef;
 import static org.apache.atlas.typesystem.types.utils.TypesUtil.createStructTypeDef;
 import static org.apache.atlas.typesystem.types.utils.TypesUtil.createTraitTypeDef;
+import static org.testng.Assert.assertTrue;
 
 public class TypeSystemTest extends BaseTest {
 
@@ -55,7 +57,7 @@ public class TypeSystemTest extends BaseTest {
     public void testGetTypeNames() throws Exception {
         getTypeSystem().defineEnumType("enum_test", new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2),
                 new EnumValue("3", 3));
-        Assert.assertTrue(getTypeSystem().getTypeNames().contains("enum_test"));
+        assertTrue(getTypeSystem().getTypeNames().contains("enum_test"));
     }
 
     @Test
@@ -65,7 +67,7 @@ public class TypeSystemTest extends BaseTest {
         String typeDescription = typeName + description;
         getTypeSystem().defineEnumType(typeName, typeDescription, new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2),
                 new EnumValue("3", 3));
-        Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName));
+        assertTrue(getTypeSystem().getTypeNames().contains(typeName));
         IDataType type = getTypeSystem().getDataType(EnumType.class, typeName);
         Assert.assertNotNull(type);
         Assert.assertEquals(type.getDescription(), typeDescription);
@@ -76,7 +78,7 @@ public class TypeSystemTest extends BaseTest {
             .createTraitTypeDef(typeName, typeDescription, ImmutableSet.<String>of(),
                 TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE));
         getTypeSystem().defineTraitType(trait);
-        Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName));
+        assertTrue(getTypeSystem().getTypeNames().contains(typeName));
         type = getTypeSystem().getDataType(TraitType.class, typeName);
         Assert.assertNotNull(type);
         Assert.assertEquals(type.getDescription(), typeDescription);
@@ -87,7 +89,7 @@ public class TypeSystemTest extends BaseTest {
             .createClassTypeDef(typeName, typeDescription, ImmutableSet.<String>of(),
                 TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE));
         getTypeSystem().defineClassType(classType);
-        Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName));
+        assertTrue(getTypeSystem().getTypeNames().contains(typeName));
         type = getTypeSystem().getDataType(ClassType.class, typeName);
         Assert.assertNotNull(type);
         Assert.assertEquals(type.getDescription(), typeDescription);
@@ -95,7 +97,7 @@ public class TypeSystemTest extends BaseTest {
         typeName = "struct_type";
         typeDescription = typeName + description;
         getTypeSystem().defineStructType(typeName, typeDescription, true, createRequiredAttrDef("a", DataTypes.INT_TYPE));
-        Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName));
+        assertTrue(getTypeSystem().getTypeNames().contains(typeName));
         type = getTypeSystem().getDataType(StructType.class, typeName);
         Assert.assertNotNull(type);
         Assert.assertEquals(type.getDescription(), typeDescription);
@@ -106,7 +108,7 @@ public class TypeSystemTest extends BaseTest {
     public void testIsRegistered() throws Exception {
         getTypeSystem().defineEnumType("enum_test", new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2),
                 new EnumValue("3", 3));
-        Assert.assertTrue(getTypeSystem().isRegistered("enum_test"));
+        assertTrue(getTypeSystem().isRegistered("enum_test"));
     }
 
     @Test
@@ -182,9 +184,9 @@ public class TypeSystemTest extends BaseTest {
         ClassType bc = ts.getDataType(ClassType.class, "B");
         ClassType cc = ts.getDataType(ClassType.class, "C");
 
-        Assert.assertTrue(ac.compareTo(bc) < 0);
-        Assert.assertTrue(bc.compareTo(cc) < 0);
-        Assert.assertTrue(ac.compareTo(cc) < 0);
+        assertTrue(ac.compareTo(bc) < 0);
+        assertTrue(bc.compareTo(cc) < 0);
+        assertTrue(ac.compareTo(cc) < 0);
     }
 
     @Test
@@ -223,4 +225,31 @@ public class TypeSystemTest extends BaseTest {
         Assert.assertEquals(traitNames.size(), 4);
         Assert.assertEquals(classNames.size(), 3);
     }
+
+    @Test
+    public void testTypeNamesAreNotDuplicated() {
+        TypeSystem typeSystem = getTypeSystem();
+        ImmutableList<String> traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT);
+        int numTraits = traitNames.size();
+
+        HashMap<String, IDataType> typesAdded = new HashMap<>();
+        String traitName = "dup_type_test" + random();
+        TraitType traitType = new TraitType(typeSystem, traitName, null, null, 0);
+        typesAdded.put(traitName, traitType);
+        typeSystem.commitTypes(typesAdded);
+
+        traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT);
+        Assert.assertEquals(traitNames.size(), numTraits+1);
+
+        // add again with another trait this time
+        traitName = "dup_type_test" + random();
+        TraitType traitTypeNew = new TraitType(typeSystem, traitName, null, null, 0);
+        typesAdded.put(traitName, traitTypeNew);
+
+        typeSystem.commitTypes(typesAdded);
+        traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT);
+        Assert.assertEquals(traitNames.size(), numTraits+2);
+    }
+
+
 }



Mime
View raw message