tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blrun...@apache.org
Subject [2/2] git commit: TAJO-704: TajoMaster HA (jaehwa)
Date Tue, 19 Aug 2014 14:48:03 GMT
TAJO-704: TajoMaster HA (jaehwa)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b16d13ad
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b16d13ad
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b16d13ad

Branch: refs/heads/master
Commit: b16d13addacc8c03f7b46b912a02231cea4c0861
Parents: 35c2492
Author: Jaehwa Jung <blrunner@apache.org>
Authored: Tue Aug 19 23:44:21 2014 +0900
Committer: Jaehwa Jung <blrunner@apache.org>
Committed: Tue Aug 19 23:44:21 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../tajo/catalog/AbstractCatalogClient.java     |  75 +++--
 .../org/apache/tajo/catalog/CatalogServer.java  |   2 +-
 .../main/java/org/apache/tajo/cli/TajoCli.java  |  24 +-
 .../org/apache/tajo/cli/TajoHAAdminCommand.java |  57 ++++
 .../java/org/apache/tajo/client/TajoAdmin.java  |  27 +-
 .../java/org/apache/tajo/client/TajoClient.java |  78 +++--
 .../org/apache/tajo/client/TajoGetConf.java     |   4 +-
 .../org/apache/tajo/client/TajoHAAdmin.java     | 210 +++++++++++++
 .../apache/tajo/client/TajoHAClientUtil.java    |  87 ++++++
 .../java/org/apache/tajo/TajoConstants.java     |   5 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   8 +
 .../org/apache/tajo/util/HAServiceUtil.java     | 293 +++++++++++++++++++
 .../apache/tajo/master/TajoContainerProxy.java  |  24 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  36 +++
 .../org/apache/tajo/master/ha/HAService.java    |  56 ++++
 .../tajo/master/ha/HAServiceHDFSImpl.java       | 278 ++++++++++++++++++
 .../apache/tajo/master/ha/TajoMasterInfo.java   |  89 ++++++
 .../tajo/master/querymaster/QueryMaster.java    |  91 +++++-
 .../master/querymaster/QueryMasterTask.java     |  24 +-
 .../tajo/webapp/QueryExecutorServlet.java       |   5 +
 .../tajo/worker/TajoResourceAllocator.java      |  29 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  36 ++-
 .../tajo/worker/WorkerHeartbeatService.java     |  18 +-
 .../resources/webapps/admin/catalogview.jsp     |  13 +-
 .../main/resources/webapps/admin/cluster.jsp    |  87 +++++-
 .../src/main/resources/webapps/admin/index.jsp  |  47 ++-
 .../src/main/resources/webapps/admin/query.jsp  |  13 +-
 .../resources/webapps/admin/query_executor.jsp  |  13 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   7 +-
 .../tajo/master/ha/TestHAServiceHDFSImpl.java   | 133 +++++++++
 tajo-dist/src/main/bin/start-tajo.sh            |  15 +-
 tajo-dist/src/main/bin/stop-tajo.sh             |  16 +-
 tajo-dist/src/main/bin/tajo                     |   5 +
 tajo-dist/src/main/bin/tajo-daemons.sh          |   1 +
 tajo-docs/src/main/sphinx/configuration.rst     |   3 +-
 .../sphinx/configuration/ha_configuration.rst   | 135 +++++++++
 37 files changed, 1937 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 841eb49..b605c71 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.9.0 - unreleased
 
   NEW FEATURES 
 
+    TAJO-704: TajoMaster HA (jaehwa)
+
     TAJO-20: INSERT INTO ... SELECT. (Hyoungjun Kim via hyunsik)
 
     TAJO-774: Implement logical plan part and physical executor for window 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 542214b..1f1e808 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -33,6 +33,7 @@ import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
+import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.net.InetSocketAddress;
@@ -58,10 +59,26 @@ public abstract class AbstractCatalogClient implements CatalogService {
     this.conf = conf;
   }
 
+  private InetSocketAddress getCatalogServerAddr() {
+    if (catalogServerAddr == null) {
+      return null;
+    } else {
+      if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+        return catalogServerAddr;
+      } else {
+        if (!HAServiceUtil.isMasterAlive(catalogServerAddr, conf)) {
+          return HAServiceUtil.getCatalogAddress(conf);
+        } else {
+          return catalogServerAddr;
+        }
+      }
+    }
+  }
+
   @Override
   public final Boolean createTablespace(final String tablespaceName, final String tablespaceUri) {
     try {
-      return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
 
@@ -80,7 +97,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean dropTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
@@ -95,7 +112,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean existTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
@@ -110,7 +127,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<String> getAllTablespaceNames() {
     try {
-      return new ServerCallable<Collection<String>>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Collection<String>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Collection<String> call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
@@ -126,7 +143,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public TablespaceProto getTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<TablespaceProto>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<TablespaceProto>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public TablespaceProto call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
@@ -141,7 +158,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public Boolean alterTablespace(final AlterTablespaceProto alterTablespace) {
     try {
-      return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.alterTablespace(null, alterTablespace).getValue();
@@ -156,7 +173,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean createDatabase(final String databaseName, @Nullable final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
 
@@ -177,7 +194,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean dropDatabase(final String databaseName) {
     try {
-      return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
@@ -192,7 +209,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean existDatabase(final String databaseName) {
     try {
-      return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
@@ -207,7 +224,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<String> getAllDatabaseNames() {
     try {
-      return new ServerCallable<Collection<String>>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Collection<String>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Collection<String> call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
@@ -223,7 +240,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final TableDesc getTableDesc(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<TableDesc>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<TableDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public TableDesc call(NettyClientBase client) throws ServiceException {
           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
           builder.setDatabaseName(databaseName);
@@ -248,7 +265,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<PartitionMethodDesc>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<PartitionMethodDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
 
           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -268,7 +285,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean existPartitionMethod(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -288,7 +305,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<String> getAllTableNames(final String databaseName) {
     try {
-      return new ServerCallable<Collection<String>>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Collection<String>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Collection<String> call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
@@ -304,7 +321,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<FunctionDesc> getFunctions() {
     try {
-      return new ServerCallable<Collection<FunctionDesc>>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Collection<FunctionDesc>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException {
           List<FunctionDesc> list = new ArrayList<FunctionDesc>();
           GetFunctionsResponse response;
@@ -331,7 +348,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean createTable(final TableDesc desc) {
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.createTable(null, desc.getProto()).getValue();
@@ -350,7 +367,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
     final String simpleName = splitted[1];
 
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -374,7 +391,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
           "tableName cannot be composed of multiple parts, but it is \"" + tableName + "\"");
     }
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -399,7 +416,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean createIndex(final IndexDesc index) {
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.createIndex(null, index.getProto()).getValue();
@@ -414,7 +431,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean existIndexByName(final String databaseName, final String indexName) {
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           IndexNameProto.Builder builder = IndexNameProto.newBuilder();
           builder.setDatabaseName(databaseName);
@@ -433,7 +450,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
           GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
@@ -453,7 +470,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final IndexDesc getIndexByName(final String databaseName, final String indexName) {
     try {
-      return new ServerCallable<IndexDesc>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public IndexDesc call(NettyClientBase client) throws ServiceException {
 
           IndexNameProto.Builder builder = IndexNameProto.newBuilder();
@@ -475,7 +492,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
                                           final String tableName,
                                           final String columnName) {
     try {
-      return new ServerCallable<IndexDesc>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public IndexDesc call(NettyClientBase client) throws ServiceException {
 
           GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
@@ -496,7 +513,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   public boolean dropIndex(final String databaseName,
                            final String indexName) {
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
           IndexNameProto.Builder builder = IndexNameProto.newBuilder();
@@ -516,7 +533,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean createFunction(final FunctionDesc funcDesc) {
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.createFunction(null, funcDesc.getProto()).getValue();
@@ -531,7 +548,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean dropFunction(final String signature) {
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
           builder.setSignature(signature);
@@ -564,7 +581,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
 
     FunctionDescProto descProto = null;
     try {
-      descProto = new ServerCallable<FunctionDescProto>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      descProto = new ServerCallable<FunctionDescProto>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public FunctionDescProto call(NettyClientBase client) throws ServiceException {
           try {
             CatalogProtocolService.BlockingInterface stub = getStub(client);
@@ -614,7 +631,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
     }
 
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.containFunction(null, builder.build()).getValue();
@@ -629,7 +646,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean alterTable(final AlterTableDesc desc) {
     try {
-      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.alterTable(null, desc.getProto()).getValue();

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 4e391aa..cf3ea6f 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -887,7 +887,7 @@ public class CatalogServer extends AbstractService {
         } else {
           for (FunctionDescProto existing : functions.get(signature)) {
             if (existing.getParameterTypesList() != null &&
-                  CatalogUtil.isMatchedFunction(existing.getParameterTypesList(), params)) {
+                CatalogUtil.isMatchedFunction(existing.getParameterTypesList(), params)) {
               return existing;
             }
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index fdc766e..2f9e0b2 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -28,10 +28,12 @@ import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.client.QueryStatus;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoHAClientUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.HAServiceUtil;
 
 import java.io.*;
 import java.lang.reflect.Constructor;
@@ -81,7 +83,8 @@ public class TajoCli {
       ExecExternalShellCommand.class,
       HdfsCommand.class,
       TajoAdminCommand.class,
-      TajoGetConfCommand.class
+      TajoGetConfCommand.class,
+      TajoHAAdminCommand.class
   };
   private final Map<String, TajoShellCommand> commands = new TreeMap<String, TajoShellCommand>();
 
@@ -226,6 +229,7 @@ public class TajoCli {
       client = new TajoClient(conf, baseDatabase);
     }
 
+    checkMasterStatus();
     context.setCurrentDatabase(client.getCurrentDatabase());
     initHistory();
     initCommands();
@@ -419,6 +423,7 @@ public class TajoCli {
   }
 
   public int executeMetaCommand(String line) throws Exception {
+    checkMasterStatus();
     String [] metaCommands = line.split(";");
     for (String metaCommand : metaCommands) {
       String arguments [] = metaCommand.split(" ");
@@ -452,7 +457,8 @@ public class TajoCli {
     return 0;
   }
 
-  private void executeJsonQuery(String json) throws ServiceException {
+  private void executeJsonQuery(String json) throws ServiceException, IOException {
+    checkMasterStatus();
     long startTime = System.currentTimeMillis();
     ClientProtos.SubmitQueryResponse response = client.executeQueryWithJson(json);
     if (response == null) {
@@ -478,7 +484,8 @@ public class TajoCli {
     }
   }
 
-  private int executeQuery(String statement) throws ServiceException {
+  private int executeQuery(String statement) throws ServiceException, IOException {
+    checkMasterStatus();
     long startTime = System.currentTimeMillis();
     ClientProtos.SubmitQueryResponse response = client.executeQuery(statement);
     if (response == null) {
@@ -626,6 +633,17 @@ public class TajoCli {
     }
   }
 
+  private void checkMasterStatus() throws IOException, ServiceException {
+    String sessionId = client.getSessionId() != null ? client.getSessionId().getId() : null;
+    client = TajoHAClientUtil.getTajoClient(conf, client, context);
+    if(sessionId != null && (client.getSessionId() == null ||
+        !sessionId.equals(client.getSessionId().getId()))) {
+      commands.clear();
+      initHistory();
+      initCommands();
+    }
+  }
+
   public static void main(String [] args) throws Exception {
     TajoConf conf = new TajoConf();
     TajoCli shell = new TajoCli(conf, args, System.in, System.out);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/cli/TajoHAAdminCommand.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoHAAdminCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoHAAdminCommand.java
new file mode 100644
index 0000000..ad88b3f
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoHAAdminCommand.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.tajo.client.TajoHAAdmin;
+
+public class TajoHAAdminCommand extends TajoShellCommand {
+  private TajoHAAdmin haAdmin;
+
+  public TajoHAAdminCommand(TajoCli.TajoCliContext context) {
+    super(context);
+    haAdmin = new TajoHAAdmin(context.getConf(), context.getOutput(), context.getTajoClient());
+  }
+
+  @Override
+  public String getCommand() {
+    return "\\haadmin";
+  }
+
+  @Override
+  public void invoke(String[] command) throws Exception {
+    try {
+      String[] haAdminCommands = new String[command.length - 1];
+      System.arraycopy(command, 1, haAdminCommands, 0, haAdminCommands.length);
+
+      haAdmin.runCommand(haAdminCommands);
+    } catch (Exception e) {
+      context.getOutput().println("ERROR: " + e.getMessage());
+    }
+  }
+
+  @Override
+  public String getUsage() {
+    return "<command> [options]";
+  }
+
+  @Override
+  public String getDescription() {
+    return "execute a tajo haAdminF command.";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 58b7184..95dfc68 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -27,6 +27,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
 import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
@@ -173,6 +174,7 @@ public class TajoAdmin {
 
   private void processDesc(Writer writer) throws ParseException, IOException,
       ServiceException, SQLException {
+    tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
     List<BriefQueryInfo> queryList = tajoClient.getRunningQueryList();
     SimpleDateFormat df = new SimpleDateFormat(DATE_FORMAT);
     int id = 1;
@@ -212,6 +214,7 @@ public class TajoAdmin {
 
   private void processCluster(Writer writer) throws ParseException, IOException,
       ServiceException, SQLException {
+    tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
     List<WorkerResourceInfo> workerList = tajoClient.getClusterInfo();
 
     int runningQueryMasterTasks = 0;
@@ -376,6 +379,7 @@ public class TajoAdmin {
 
   private void processList(Writer writer) throws ParseException, IOException,
       ServiceException, SQLException {
+    tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
     List<BriefQueryInfo> queryList = tajoClient.getRunningQueryList();
     SimpleDateFormat df = new SimpleDateFormat(DATE_FORMAT);
     StringBuilder builder = new StringBuilder();
@@ -416,10 +420,25 @@ public class TajoAdmin {
 
   private void processMasters(Writer writer) throws ParseException, IOException,
       ServiceException, SQLException {
-    String confMasterServiceAddr = tajoClient.getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
-    InetSocketAddress masterAddress = NetUtils.createSocketAddr(confMasterServiceAddr);
-    writer.write(masterAddress.getHostName());
-    writer.write("\n");
+    tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
+    if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+
+      List<String> list = HAServiceUtil.getMasters(tajoConf);
+      int i = 0;
+      for (String master : list) {
+        if (i > 0) {
+          writer.write(" ");
+        }
+        writer.write(master);
+        i++;
+      }
+      writer.write("\n");
+    } else {
+      String confMasterServiceAddr = tajoClient.getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+      InetSocketAddress masterAddress = NetUtils.createSocketAddr(confMasterServiceAddr);
+      writer.write(masterAddress.getHostName());
+      writer.write("\n");
+    }
   }
 
   public static void main(String [] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 3a90a48..cc993f3 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -29,7 +29,10 @@ import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.annotation.ThreadSafe;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.cli.InvalidClientSessionException;
 import org.apache.tajo.conf.TajoConf;
@@ -46,6 +49,7 @@ import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 
@@ -130,15 +134,30 @@ public class TajoClient implements Closeable {
     return sessionId;
   }
 
+  private InetSocketAddress getTajoMasterAddr() {
+    if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      return tajoMasterAddr;
+    } else {
+      if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) {
+        return HAServiceUtil.getMasterClientAddress(conf);
+      } else {
+        return tajoMasterAddr;
+      }
+    }
+  }
+
+  public String getBaseDatabase() {
+    return baseDatabase;
+  }
+
   @Override
   public void close() {
     // remove session
     try {
-      NettyClientBase client = connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false);
+      NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
       TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
       tajoMaster.removeSession(null, sessionId);
     } catch (Exception e) {
-      LOG.error(e);
     }
 
     if(connPool != null) {
@@ -203,7 +222,7 @@ public class TajoClient implements Closeable {
   }
 
   public String getCurrentDatabase() throws ServiceException {
-    return new ServerCallable<String>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
 
       public String call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -215,7 +234,7 @@ public class TajoClient implements Closeable {
   }
 
   public Boolean selectDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -227,7 +246,7 @@ public class TajoClient implements Closeable {
   }
 
   public Boolean updateSessionVariables(final Map<String, String> variables) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -245,7 +264,7 @@ public class TajoClient implements Closeable {
   }
 
   public Boolean unsetSessionVariables(final List<String> variables)  throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -260,7 +279,7 @@ public class TajoClient implements Closeable {
   }
 
   public String getSessionVariable(final String varname) throws ServiceException {
-    return new ServerCallable<String>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
 
       public String call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -272,7 +291,7 @@ public class TajoClient implements Closeable {
   }
 
   public Boolean existSessionVariable(final String varname) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -284,7 +303,7 @@ public class TajoClient implements Closeable {
   }
 
   public Map<String, String> getAllSessionVariables() throws ServiceException {
-    return new ServerCallable<Map<String, String>>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class,
+    return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class,
         false, true) {
 
       public Map<String, String> call(NettyClientBase client) throws ServiceException {
@@ -304,7 +323,7 @@ public class TajoClient implements Closeable {
    * or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}.
    */
   public SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
-    return new ServerCallable<SubmitQueryResponse>(connPool, tajoMasterAddr,
+    return new ServerCallable<SubmitQueryResponse>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -320,7 +339,7 @@ public class TajoClient implements Closeable {
   }
 
   public SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
-    return new ServerCallable<SubmitQueryResponse>(connPool, tajoMasterAddr,
+    return new ServerCallable<SubmitQueryResponse>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -412,7 +431,7 @@ public class TajoClient implements Closeable {
     } else {
       NettyClientBase tmClient = null;
       try {
-        tmClient = connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false);
+        tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
 
         checkSessionAndGet(tmClient);
         builder.setSessionId(sessionId);
@@ -578,7 +597,7 @@ public class TajoClient implements Closeable {
   }
 
   public boolean updateQuery(final String sql) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -602,7 +621,7 @@ public class TajoClient implements Closeable {
   }
 
   public boolean updateQueryWithJson(final String json) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -633,7 +652,7 @@ public class TajoClient implements Closeable {
    * @throws ServiceException
    */
   public boolean createDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -650,7 +669,7 @@ public class TajoClient implements Closeable {
    * @throws ServiceException
    */
   public boolean existDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -667,7 +686,7 @@ public class TajoClient implements Closeable {
    * @throws ServiceException
    */
   public boolean dropDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -677,7 +696,7 @@ public class TajoClient implements Closeable {
   }
 
   public List<String> getAllDatabaseNames() throws ServiceException {
-    return new ServerCallable<List<String>>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<List<String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
       public List<String> call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -693,7 +712,7 @@ public class TajoClient implements Closeable {
    * @return True if so.
    */
   public boolean existTable(final String tableName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -718,7 +737,7 @@ public class TajoClient implements Closeable {
   public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
                                        final TableMeta meta)
       throws SQLException, ServiceException {
-    return new ServerCallable<TableDesc>(connPool, tajoMasterAddr,
+    return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
         checkSessionAndGet(client);
@@ -759,7 +778,7 @@ public class TajoClient implements Closeable {
    * @return True if the table is dropped successfully.
    */
   public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -777,7 +796,7 @@ public class TajoClient implements Closeable {
   }
 
   public List<BriefQueryInfo> getRunningQueryList() throws ServiceException {
-    return new ServerCallable<List<BriefQueryInfo>>(connPool, tajoMasterAddr,
+    return new ServerCallable<List<BriefQueryInfo>>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -792,7 +811,7 @@ public class TajoClient implements Closeable {
   }
 
   public List<BriefQueryInfo> getFinishedQueryList() throws ServiceException {
-    return new ServerCallable<List<BriefQueryInfo>>(connPool, tajoMasterAddr,
+    return new ServerCallable<List<BriefQueryInfo>>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -806,7 +825,7 @@ public class TajoClient implements Closeable {
   }
 
   public List<WorkerResourceInfo> getClusterInfo() throws ServiceException {
-    return new ServerCallable<List<WorkerResourceInfo>>(connPool, tajoMasterAddr,
+    return new ServerCallable<List<WorkerResourceInfo>>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public List<WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -829,7 +848,7 @@ public class TajoClient implements Closeable {
    *                     in the current database of this session.
    */
   public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
-    return new ServerCallable<List<String>>(connPool, tajoMasterAddr,
+    return new ServerCallable<List<String>>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public List<String> call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -854,8 +873,7 @@ public class TajoClient implements Closeable {
    * @return Table description
    */
   public TableDesc getTableDesc(final String tableName) throws ServiceException {
-    return new ServerCallable<TableDesc>(connPool, tajoMasterAddr,
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
         checkSessionAndGet(client);
 
@@ -882,7 +900,7 @@ public class TajoClient implements Closeable {
     NettyClientBase tmClient = null;
     try {
       /* send a kill to the TM */
-      tmClient = connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false);
+      tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
       TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
 
       checkSessionAndGet(tmClient);
@@ -915,7 +933,7 @@ public class TajoClient implements Closeable {
   }
 
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
-    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connPool, tajoMasterAddr,
+    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
         checkSessionAndGet(client);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
index 32e6382..52e1894 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
@@ -159,8 +159,8 @@ public class TajoGetConf {
   public static void main(String [] args) throws Exception {
     TajoConf conf = new TajoConf();
 
-    Writer writer = new PrintWriter(System.out);    try {
-      System.out.println("### 1000 ###");
+    Writer writer = new PrintWriter(System.out);
+    try {
       TajoGetConf admin = new TajoGetConf(conf, writer);
       admin.runCommand(args, false);
     } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
new file mode 100644
index 0000000..11cb4ed
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.cli.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.util.HAServiceUtil;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.util.List;
+
+public class TajoHAAdmin {
+  private static final Options options;
+
+  static {
+    options = new Options();
+    options.addOption("h", "host", true, "Tajo server host");
+    options.addOption("p", "port", true, "Tajo server port");
+    options.addOption("transitionToActive", null, true, "Transitions the master into Active state");
+    options.addOption("transitionToBackup", null, true, "Transitions the master into Backup state");
+    options.addOption("getState", null, true, "Returns the state of the master");
+    options.addOption("formatHA", null, false, "Format HA status on share storage");
+  }
+
+  private TajoConf tajoConf;
+  private TajoClient tajoClient;
+  private Writer writer;
+
+  public TajoHAAdmin(TajoConf tajoConf, Writer writer) {
+    this(tajoConf, writer, null);
+  }
+
+  public TajoHAAdmin(TajoConf tajoConf, Writer writer, TajoClient tajoClient) {
+    this.tajoConf = tajoConf;
+    this.writer = writer;
+    this.tajoClient = tajoClient;
+  }
+
+  private void printUsage() {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp( "haadmin [options]", options );
+  }
+
+  public void runCommand(String[] args) throws Exception {
+    if(args.length == 1 &&
+        (args[0].equalsIgnoreCase("-transitionToActive")
+            || args[0].equalsIgnoreCase("-transitionToBackup")
+            || args[0].equalsIgnoreCase("-getState"))) {
+      writer.write("Not enough arguments: expected 1 but got 0\n");
+      writer.flush();
+      return;
+    }
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+
+    String param = "";
+    int cmdType = 0;
+
+    String hostName = null;
+    Integer port = null;
+    if (cmd.hasOption("h")) {
+      hostName = cmd.getOptionValue("h");
+    }
+    if (cmd.hasOption("p")) {
+      port = Integer.parseInt(cmd.getOptionValue("p"));
+    }
+
+    if (cmd.hasOption("transitionToActive")) {
+      cmdType = 1;
+      param = cmd.getOptionValue("transitionToActive");
+    } else if (cmd.hasOption("transitionToBackup")) {
+      cmdType = 2;
+      param = cmd.getOptionValue("transitionToBackup");
+    } else if (cmd.hasOption("getState")) {
+      cmdType = 3;
+      param = cmd.getOptionValue("getState");
+    } else if (cmd.hasOption("formatHA")) {
+      cmdType = 4;
+    }
+
+    // if there is no "-h" option,
+    if(hostName == null) {
+      if (tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
+        // it checks if the client service address is given in configuration and distributed mode.
+        // if so, it sets entryAddr.
+        hostName = tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
+      }
+    }
+    if (port == null) {
+      if (tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
+        // it checks if the client service address is given in configuration and distributed mode.
+        // if so, it sets entryAddr.
+        port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
+      }
+    }
+
+    if (cmdType == 0) {
+      printUsage();
+      return;
+    }
+
+
+    if ((hostName == null) ^ (port == null)) {
+      System.err.println("ERROR: cannot find valid Tajo server address");
+      return;
+    } else if (hostName != null && port != null) {
+      tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port);
+      tajoClient = new TajoClient(tajoConf);
+    } else if (hostName == null && port == null) {
+      tajoClient = new TajoClient(tajoConf);
+    }
+
+    if (!tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      writer.write("HA is not enabled for this tajo cluster.");
+    } else {
+      switch (cmdType) {
+        case 1:
+          writer.write("Not Yet Implemented\n");
+          break;
+        case 2:
+          writer.write("Not Yet Implemented\n");
+          break;
+        case 3:
+          getState(writer, param);
+          break;
+        case 4:
+          formatHA(writer);
+          break;
+        default:
+          printUsage();
+          break;
+      }
+    }
+
+    writer.flush();
+  }
+
+  private void getState(Writer writer, String param) throws ParseException, IOException,
+      ServiceException {
+    tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
+    int retValue = HAServiceUtil.getState(param, tajoClient.getConf());
+
+    switch (retValue) {
+      case 1:
+        writer.write("The master is active.\n");
+        break;
+      case 0:
+        writer.write("The master is backup.\n");
+        break;
+      case -1:
+        writer.write("Finding failed. - master:" + param + "\n");
+        break;
+      default:
+        writer.write("Cannot find the master. - master:" + param + "\n");
+        break;
+    }
+  }
+
+  private void formatHA(Writer writer) throws ParseException, IOException,
+      ServiceException {
+    int retValue = HAServiceUtil.formatHA(tajoClient.getConf());
+
+    switch (retValue) {
+      case 1:
+        writer.write("Formatting finished successfully.\n");
+        break;
+      case 0:
+        writer.write("If you want to format the ha information, you must shutdown tajo masters "
+            + " before formatting.\n");
+        break;
+      default:
+        writer.write("Cannot format ha information.\n");
+        break;
+    }
+  }
+
+  public static void main(String [] args) throws Exception {
+    TajoConf conf = new TajoConf();
+
+    Writer writer = new PrintWriter(System.out);
+    try {
+      TajoHAAdmin admin = new TajoHAAdmin(conf, writer);
+      admin.runCommand(args);
+    } finally {
+      writer.close();
+      System.exit(0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
new file mode 100644
index 0000000..f22d5ba
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.cli.TajoCli.TajoCliContext;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.HAServiceUtil;
+
+import java.io.IOException;
+
+public class TajoHAClientUtil {
+
+  /**
+   * In TajoMaster HA mode, if TajoCli can't connect existing active master,
+   * this should try to connect new active master.
+   *
+   * @param conf
+   * @param client
+   * @return
+   * @throws IOException
+   * @throws ServiceException
+   */
+  public static TajoClient getTajoClient(TajoConf conf, TajoClient client)
+      throws IOException, ServiceException {
+    return getTajoClient(conf, client, null);
+  }
+
+  public static TajoClient getTajoClient(TajoConf conf, TajoClient client,
+      TajoCliContext context) throws IOException, ServiceException {
+
+    if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      if (!HAServiceUtil.isMasterAlive(conf.getVar(
+          TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) {
+        TajoClient tajoClient = null;
+        String baseDatabase = client.getBaseDatabase();
+        conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
+            HAServiceUtil.getMasterClientName(conf));
+        client.close();
+        tajoClient = new TajoClient(conf, baseDatabase);
+
+        if (context != null && context.getCurrentDatabase() != null) {
+          tajoClient.selectDatabase(context.getCurrentDatabase());
+        }
+        return tajoClient;
+      } else {
+        return client;
+      }
+    } else {
+      return client;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
index 5f28f1c..ffe8c02 100644
--- a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
@@ -35,5 +35,10 @@ public class TajoConstants {
 
   public static final String EMPTY_STRING = "";
 
+  public static final String SYSTEM_HA_DIR_NAME = "ha";
+  public static final String SYSTEM_HA_ACTIVE_DIR_NAME = "active";
+  public static final String SYSTEM_HA_BACKUP_DIR_NAME = "backup";
+
+
   private TajoConstants() {}
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 2504c23..f4229e7 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -158,6 +158,10 @@ public class TajoConf extends Configuration {
     TAJO_MASTER_CLIENT_RPC_ADDRESS("tajo.master.client-rpc.address", "localhost:26002"),
     TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080"),
 
+    // Tajo Master HA Configurations
+    TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false),
+    TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec
+
     // Resource tracker service
     RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003"),
     RESOURCE_TRACKER_HEARTBEAT_TIMEOUT("tajo.resource-tracker.heartbeat.timeout-secs", 120 * 1000), // seconds
@@ -597,6 +601,10 @@ public class TajoConf extends Configuration {
     return new Path(getSystemDir(conf), TajoConstants.SYSTEM_RESOURCE_DIR_NAME);
   }
 
+  public static Path getSystemHADir(TajoConf conf) {
+    return new Path(getSystemDir(conf), TajoConstants.SYSTEM_HA_DIR_NAME);
+  }
+
   private static boolean hasScheme(String path) {
     return path.indexOf("file:/") == 0 || path.indexOf("hdfs:/") == 0;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java
new file mode 100644
index 0000000..4f03113
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HAServiceUtil {
+
+  private final static int MASTER_UMBILICAL_RPC_ADDRESS = 1;
+  private final static int MASTER_CLIENT_RPC_ADDRESS = 2;
+  private final static int RESOURCE_TRACKER_RPC_ADDRESS = 3;
+  private final static int CATALOG_ADDRESS = 4;
+  private final static int MASTER_INFO_ADDRESS = 5;
+
+  public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) {
+    return getMasterAddress(conf, MASTER_UMBILICAL_RPC_ADDRESS);
+  }
+
+  public static String getMasterUmbilicalName(TajoConf conf) {
+    return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf));
+  }
+
+  public static InetSocketAddress getMasterClientAddress(TajoConf conf) {
+    return getMasterAddress(conf, MASTER_CLIENT_RPC_ADDRESS);
+  }
+
+  public static String getMasterClientName(TajoConf conf) {
+    return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf));
+  }
+
+  public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) {
+    return getMasterAddress(conf, RESOURCE_TRACKER_RPC_ADDRESS);
+  }
+
+  public static String getResourceTrackerName(TajoConf conf) {
+    return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf));
+  }
+
+  public static InetSocketAddress getCatalogAddress(TajoConf conf) {
+    return getMasterAddress(conf, CATALOG_ADDRESS);
+  }
+
+  public static String getCatalogName(TajoConf conf) {
+    return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf));
+  }
+
+  public static InetSocketAddress getMasterInfoAddress(TajoConf conf) {
+    return getMasterAddress(conf, MASTER_INFO_ADDRESS);
+  }
+
+  public static String getMasterInfoName(TajoConf conf) {
+    return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf));
+  }
+
+  public static InetSocketAddress getMasterAddress(TajoConf conf, int type) {
+    InetSocketAddress masterAddress = null;
+
+    if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      try {
+        FileSystem fs = getFileSystem(conf);
+        Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+
+        if (fs.exists(activePath)) {
+          FileStatus[] files = fs.listStatus(activePath);
+
+          if (files.length == 1) {
+            Path file = files[0].getPath();
+            String hostAddress = file.getName().replaceAll("_", ":");
+            FSDataInputStream stream = fs.open(file);
+            String data = stream.readUTF();
+            stream.close();
+
+            String[] addresses = data.split("_");
+
+            switch (type) {
+              case 1:
+                masterAddress = NetUtils.createSocketAddr(hostAddress);
+                break;
+              case 2:
+                masterAddress = NetUtils.createSocketAddr(addresses[0]);
+                break;
+              case 3:
+                masterAddress = NetUtils.createSocketAddr(addresses[1]);
+                break;
+              case 4:
+                masterAddress = NetUtils.createSocketAddr(addresses[2]);
+                break;
+              case 5:
+                masterAddress = NetUtils.createSocketAddr(addresses[3]);
+                break;
+              default:
+                break;
+            }
+          }
+        }
+
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+
+    if (masterAddress == null) {
+      switch (type) {
+        case 1:
+          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+              .TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+          break;
+        case 2:
+          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+              .TAJO_MASTER_CLIENT_RPC_ADDRESS));
+          break;
+        case 3:
+          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+              .RESOURCE_TRACKER_RPC_ADDRESS));
+          break;
+        case 4:
+          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+              .CATALOG_ADDRESS));
+          break;
+        case 5:
+          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+              .TAJO_MASTER_INFO_ADDRESS));
+          break;
+        default:
+          break;
+      }
+    }
+
+    return masterAddress;
+  }
+
+  public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
+    return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf);
+  }
+
+  public static boolean isMasterAlive(String masterName, TajoConf conf) {
+    boolean isAlive = true;
+
+    try {
+      // how to create sockets
+      SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf);
+
+      int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
+          CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
+
+      InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName);
+
+      // connected socket
+      Socket socket = socketFactory.createSocket();
+      org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout);
+    } catch (Exception e) {
+      isAlive = false;
+    }
+    return isAlive;
+  }
+
+  public static int getState(String masterName, TajoConf conf) {
+    String targetMaster = masterName.replaceAll(":", "_");
+    int retValue = -1;
+
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+
+      Path temPath = null;
+
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        if (temPath.getName().equals(targetMaster)) {
+          return 0;
+        }
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        if (temPath.getName().equals(targetMaster)) {
+          return 1;
+        }
+      }
+      retValue = -2;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return retValue;
+  }
+
+  public static int formatHA(TajoConf conf) {
+    int retValue = -1;
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+      Path temPath = null;
+
+      int aliveMasterCount = 0;
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+          aliveMasterCount++;
+        }
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+          aliveMasterCount++;
+        }
+      }
+
+      // If there is any alive master, users can't format storage.
+      if (aliveMasterCount > 0) {
+        return 0;
+      }
+
+      // delete ha path.
+      fs.delete(TajoConf.getSystemHADir(conf), true);
+      retValue = 1;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return retValue;
+  }
+
+
+  public static List<String> getMasters(TajoConf conf) {
+    List<String> list = new ArrayList<String>();
+
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+      Path temPath = null;
+
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        list.add(temPath.getName().replaceAll("_", ":"));
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        list.add(temPath.getName().replaceAll("_", ":"));
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return list;
+  }
+
+  private static FileSystem getFileSystem(TajoConf conf) throws IOException {
+    Path rootPath = TajoConf.getTajoRootDir(conf);
+    return rootPath.getFileSystem(conf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 7f1eac6..cb7861c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
@@ -34,6 +35,7 @@ import org.apache.tajo.master.rm.TajoWorkerContainerId;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.util.HAServiceUtil;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -160,9 +162,29 @@ public class TajoContainerProxy extends ContainerProxy {
     RpcConnectionPool connPool = RpcConnectionPool.getPool(context.getConf());
     NettyClientBase tmClient = null;
     try {
+      // In TajoMaster HA mode, if backup master be active status,
+      // worker may fail to connect existing active master. Thus,
+      // if worker can't connect the master, worker should try to connect another master and
+      // update master address in worker context.
+      TajoConf conf = context.getConf();
+      if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+        try {
+          tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+        } catch (Exception e) {
+          context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
+              HAServiceUtil.getResourceTrackerAddress(conf));
+          context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
+              HAServiceUtil.getMasterUmbilicalAddress(conf));
+          tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+        }
+      } else {
         tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
             TajoMasterProtocol.class, true);
-        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+      }
+
+      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
         masterClientService.releaseWorkerResource(null,
           TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
               .setExecutionBlockId(executionBlockId.getProto())

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 0962ca5..4e51460 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -45,6 +45,8 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.function.annotation.Description;
 import org.apache.tajo.engine.function.annotation.ParamOptionTypes;
 import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.master.ha.HAService;
+import org.apache.tajo.master.ha.HAServiceHDFSImpl;
 import org.apache.tajo.master.metrics.CatalogMetricsGaugeSet;
 import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.master.querymaster.QueryJobManager;
@@ -130,6 +132,8 @@ public class TajoMaster extends CompositeService {
 
   private TajoSystemMetrics systemMetrics;
 
+  private HAService haService;
+
   public TajoMaster() throws Exception {
     super(TajoMaster.class.getName());
   }
@@ -219,6 +223,20 @@ public class TajoMaster extends CompositeService {
     }
   }
 
+
+  private void initHAManger() throws Exception {
+    // If tajo provides haService based on ZooKeeper, following codes need to update.
+    if (systemConf.getBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      haService = new HAServiceHDFSImpl(context);
+      haService.register();
+    }
+  }
+
+  public boolean isActiveMaster() {
+    return (haService != null ? haService.isActiveStatus() : true);
+  }
+
+
   private void checkAndInitializeSystemDirectories() throws IOException {
     // Get Tajo root dir
     this.tajoRootPath = TajoConf.getTajoRootDir(systemConf);
@@ -362,6 +380,12 @@ public class TajoMaster extends CompositeService {
     }
 
     initSystemMetrics();
+
+    try {
+      initHAManger();
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
   }
 
   private void writeSystemConf() throws IOException {
@@ -402,6 +426,14 @@ public class TajoMaster extends CompositeService {
 
   @Override
   public void stop() {
+    if (haService != null) {
+      try {
+        haService.delete();
+      } catch (Exception e) {
+        LOG.error(e);
+      }
+    }
+
     if (webServer != null) {
       try {
         webServer.stop();
@@ -492,6 +524,10 @@ public class TajoMaster extends CompositeService {
     public TajoSystemMetrics getSystemMetrics() {
       return systemMetrics;
     }
+
+    public HAService getHAService() {
+      return haService;
+    }
   }
 
   String getThreadTaskName(long id, String name) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
new file mode 100644
index 0000000..3d6669c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.ha;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The HAService is responsible for setting active TajoMaster on startup or when the
+ * current active is changing (eg due to failure), monitoring the health of TajoMaster.
+ *
+ */
+public interface HAService {
+
+  /**
+   * Add master name to shared storage.
+   */
+  public void register() throws IOException;
+
+
+  /**
+   * Delete master name to shared storage.
+   *
+   */
+  public void delete() throws IOException;
+
+  /**
+   *
+   * @return True if current master is an active master.
+   */
+  public boolean isActiveStatus();
+
+  /**
+   *
+   * @return return all master list
+   * @throws IOException
+   */
+  public List<TajoMasterInfo> getMasters() throws IOException;
+
+}


Mime
View raw message