tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/2] tajo git commit: TAJO-1306: HAServiceUtil should not directly use HDFS.
Date Wed, 28 Jan 2015 17:56:24 GMT
TAJO-1306: HAServiceUtil should not directly use HDFS.

Closes #358


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4595375f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4595375f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4595375f

Branch: refs/heads/master
Commit: 4595375f7e6b62436e0d4bf88a8aef1ca680c726
Parents: 015913b
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Wed Jan 28 09:23:20 2015 -0800
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Wed Jan 28 09:23:20 2015 -0800

----------------------------------------------------------------------
 .../tajo/catalog/AbstractCatalogClient.java     |  12 +-
 .../apache/tajo/client/DummyServiceTracker.java |  84 +++
 .../apache/tajo/client/SessionConnection.java   |  44 +-
 .../org/apache/tajo/client/TajoClientImpl.java  |  38 +-
 .../apache/tajo/client/TajoHAClientUtil.java    |  14 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   7 +-
 .../java/org/apache/tajo/ha/HAServiceUtil.java  |  39 --
 .../apache/tajo/service/BaseServiceTracker.java |  97 ++++
 .../apache/tajo/service/HAServiceTracker.java   |  48 ++
 .../org/apache/tajo/service/ServiceTracker.java |  63 ++
 .../tajo/service/ServiceTrackerException.java   |  30 +
 .../tajo/service/ServiceTrackerFactory.java     |  41 ++
 .../org/apache/tajo/service/TajoMasterInfo.java |  89 +++
 .../org/apache/tajo/benchmark/BenchmarkSet.java |  15 +-
 .../main/java/org/apache/tajo/ha/HAService.java |  56 --
 .../org/apache/tajo/ha/HAServiceHDFSImpl.java   | 316 ----------
 .../org/apache/tajo/ha/HdfsServiceTracker.java  | 576 +++++++++++++++++++
 .../java/org/apache/tajo/ha/TajoMasterInfo.java |  89 ---
 .../apache/tajo/master/TajoContainerProxy.java  |  27 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  30 +-
 .../apache/tajo/querymaster/QueryMaster.java    |  66 +--
 .../main/java/org/apache/tajo/util/JSPUtil.java |  12 +-
 .../tajo/worker/TajoResourceAllocator.java      |  28 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  46 +-
 .../tajo/worker/TajoWorkerClientService.java    |   2 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  20 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  11 +-
 .../resources/webapps/admin/catalogview.jsp     |   5 +-
 .../main/resources/webapps/admin/cluster.jsp    |   7 +-
 .../src/main/resources/webapps/admin/index.jsp  |   7 +-
 .../resources/webapps/admin/query_executor.jsp  |   5 +-
 .../apache/tajo/ha/TestHAServiceHDFSImpl.java   |  10 +-
 .../org/apache/tajo/jdbc/JdbcConnection.java    |  10 +-
 .../org/apache/tajo/jdbc/TajoStatement.java     |   4 +-
 .../org/apache/tajo/storage/StorageUtil.java    |  16 -
 35 files changed, 1156 insertions(+), 808 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 1a2fd44..718f7d6 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -29,12 +29,13 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.net.InetSocketAddress;
@@ -48,6 +49,7 @@ import java.util.List;
 public abstract class AbstractCatalogClient implements CatalogService {
   private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
 
+  protected ServiceTracker serviceTracker;
   protected RpcConnectionPool pool;
   protected InetSocketAddress catalogServerAddr;
   protected TajoConf conf;
@@ -57,6 +59,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
     this.pool = RpcConnectionPool.getPool();
     this.catalogServerAddr = catalogServerAddr;
+    this.serviceTracker = ServiceTrackerFactory.get(conf);
     this.conf = conf;
   }
 
@@ -64,14 +67,11 @@ public abstract class AbstractCatalogClient implements CatalogService {
     if (catalogServerAddr == null) {
       return null;
     } else {
+
       if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
         return catalogServerAddr;
       } else {
-        if (!HAServiceUtil.isMasterAlive(catalogServerAddr, conf)) {
-          return HAServiceUtil.getCatalogAddress(conf);
-        } else {
-          return catalogServerAddr;
-        }
+        return serviceTracker.getCatalogAddress();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
new file mode 100644
index 0000000..762c2e7
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
+import org.apache.tajo.service.TajoMasterInfo;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public class DummyServiceTracker implements ServiceTracker {
+  private InetSocketAddress address;
+
+  public DummyServiceTracker(InetSocketAddress address) {
+    this.address = address;
+  }
+
+  @Override
+  public boolean isHighAvailable() {
+    return false;
+  }
+
+  @Override
+  public InetSocketAddress getUmbilicalAddress() {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public InetSocketAddress getClientServiceAddress() {
+    return address;
+  }
+
+  @Override
+  public InetSocketAddress getResourceTrackerAddress() {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public InetSocketAddress getCatalogAddress() {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public void register() throws IOException {
+  }
+
+  @Override
+  public void delete() throws IOException {
+  }
+
+  @Override
+  public boolean isActiveStatus() {
+    return true;
+  }
+
+  @Override
+  public List<TajoMasterInfo> getMasters() throws IOException {
+    throw new UnsupportedException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 5490be4..3e2b9cc 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -21,12 +21,10 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.auth.UserRoleInfo;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse;
@@ -34,6 +32,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
@@ -58,8 +57,6 @@ public class SessionConnection implements Closeable {
 
   private final TajoConf conf;
 
-  final InetSocketAddress tajoMasterAddr;
-
   final RpcConnectionPool connPool;
 
   private final String baseDatabase;
@@ -73,41 +70,29 @@ public class SessionConnection implements Closeable {
   /** session variable cache */
   private final Map<String, String> sessionVarsCache = new HashMap<String, String>();
 
-
-  public SessionConnection(TajoConf conf) throws IOException {
-    this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
-  }
-
-  public SessionConnection(TajoConf conf, @Nullable String baseDatabase) throws IOException {
-    this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase);
-  }
-
-  public SessionConnection(InetSocketAddress addr) throws IOException {
-    this(new TajoConf(), addr, null);
-  }
-
-  public SessionConnection(String hostname, int port, String baseDatabase) throws IOException {
-    this(new TajoConf(), NetUtils.createSocketAddr(hostname, port), baseDatabase);
-  }
+  private ServiceTracker serviceTracker;
 
   /**
    * Connect to TajoMaster
    *
    * @param conf TajoConf
-   * @param addr TajoMaster address
+   * @param tracker TajoMaster address
    * @param baseDatabase The base database name. It is case sensitive. If it is null,
    *                     the 'default' database will be used.
    * @throws java.io.IOException
    */
-  public SessionConnection(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
+  public SessionConnection(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase)
+      throws IOException {
+
     this.conf = conf;
     this.conf.set("tajo.disk.scheduler.report.interval", "0");
-    this.tajoMasterAddr = addr;
     int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
     // Don't share connection pool per client
     connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum);
     userInfo = UserRoleInfo.getCurrentUser();
     this.baseDatabase = baseDatabase != null ? baseDatabase : null;
+
+    this.serviceTracker = tracker;
   }
 
   public Map<String, String> getClientSideSessionVars() {
@@ -140,7 +125,8 @@ public class SessionConnection implements Closeable {
   public boolean isConnected() {
     if(!closed.get()){
       try {
-        return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
+        return connPool.getConnection(serviceTracker.getClientServiceAddress(),
+            TajoMasterClientProtocol.class, false).isConnected();
       } catch (Throwable e) {
         return false;
       }
@@ -309,15 +295,7 @@ public class SessionConnection implements Closeable {
   }
 
   protected InetSocketAddress getTajoMasterAddr() {
-    if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-      return tajoMasterAddr;
-    } else {
-      if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) {
-        return HAServiceUtil.getMasterClientAddress(conf);
-      } else {
-        return tajoMasterAddr;
-      }
-    }
+    return serviceTracker.getClientServiceAddress();
   }
 
   protected void checkSessionAndGet(NettyClientBase client) throws ServiceException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index 8eafc91..f8eef28 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -32,8 +32,6 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.jdbc.TajoResultSet;
@@ -41,7 +39,8 @@ import org.apache.tajo.rule.EvaluationContext;
 import org.apache.tajo.rule.EvaluationFailedException;
 import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
 import org.apache.tajo.rule.SelfDiagnosisRuleSession;
-import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -56,41 +55,30 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
   QueryClient queryClient;
   CatalogAdminClient catalogClient;
 
-  public TajoClientImpl(TajoConf conf) throws IOException {
-    this(conf, TajoHAClientUtil.getRpcClientAddress(conf), null);
-  }
-
-  public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
-    this(conf, TajoHAClientUtil.getRpcClientAddress(conf), baseDatabase);
-  }
-
-  public TajoClientImpl(InetSocketAddress addr) throws IOException {
-    this(new TajoConf(), addr, null);
-  }
-
   /**
    * Connect to TajoMaster
    *
    * @param conf TajoConf
-   * @param addr TajoMaster address
+   * @param tracker ServiceTracker to discovery Tajo Client RPC
    * @param baseDatabase The base database name. It is case sensitive. If it is null,
    *                     the 'default' database will be used.
    * @throws java.io.IOException
    */
-  public TajoClientImpl(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
-    super(conf, addr, baseDatabase);
+  public TajoClientImpl(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase) throws IOException {
+    super(conf, tracker, baseDatabase);
+
     this.queryClient = new QueryClientImpl(this);
     this.catalogClient = new CatalogAdminClientImpl(this);
-    
+
     diagnoseTajoClient();
   }
 
-  public TajoClientImpl(String hostName, int port, @Nullable String baseDatabase) throws IOException {
-    super(hostName, port, baseDatabase);
-    this.queryClient = new QueryClientImpl(this);
-    this.catalogClient = new CatalogAdminClientImpl(this);
-    
-    diagnoseTajoClient();
+  public TajoClientImpl(TajoConf conf) throws IOException {
+    this(conf, ServiceTrackerFactory.get(conf), null);
+  }
+
+  public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
+    this(conf, ServiceTrackerFactory.get(conf), baseDatabase);
   }
   
   private void diagnoseTajoClient() throws EvaluationFailedException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
index 12a9ec8..7267b10 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
@@ -40,10 +40,8 @@ import com.google.protobuf.ServiceException;
 import org.apache.tajo.cli.tsql.TajoCli.TajoCliContext;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.util.NetUtils;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 public class TajoHAClientUtil {
   /**
@@ -65,6 +63,7 @@ public class TajoHAClientUtil {
       TajoCliContext context) throws IOException, ServiceException {
 
     if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+
       if (!HAServiceUtil.isMasterAlive(conf.getVar(
         TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) {
         TajoClient tajoClient = null;
@@ -85,15 +84,4 @@ public class TajoHAClientUtil {
       return client;
     }
   }
-
-
-  public static InetSocketAddress getRpcClientAddress(TajoConf conf) {
-    if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-      return NetUtils.createSocketAddr(HAServiceUtil.getMasterClientName(conf));
-    } else {
-      return NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
-        .TAJO_MASTER_CLIENT_RPC_ADDRESS));
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 1bb96bc..fe5ff54 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ConfigKey;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.service.BaseServiceTracker;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.NumberUtil;
 import org.apache.tajo.util.TUtil;
@@ -134,10 +135,14 @@ public class TajoConf extends Configuration {
         Validators.networkAddr()),
     TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080", Validators.networkAddr()),
 
-    // Tajo Master HA Configurations
+    // High availability configurations
     TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()),
     TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec
 
+    // Service discovery
+    DEFAULT_SERVICE_TRACKER_CLASS("tajo.discovery.service-tracker.class", BaseServiceTracker.class.getCanonicalName()),
+    HA_SERVICE_TRACKER_CLASS("tajo.discovery.ha-service-tracker.class", "org.apache.tajo.ha.HdfsServiceTracker"),
+
     // Resource tracker service
     RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003",
         Validators.networkAddr()),

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
index 52c2ade..7001228 100644
--- a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
@@ -18,8 +18,6 @@
 
 package org.apache.tajo.ha;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.*;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf;
@@ -34,15 +32,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class HAServiceUtil {
-  private static Log LOG = LogFactory.getLog(HAServiceUtil.class);
-
-  public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) {
-    return getMasterAddress(conf, HAConstants.MASTER_UMBILICAL_RPC_ADDRESS);
-  }
-
-  public static String getMasterUmbilicalName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf));
-  }
 
   public static InetSocketAddress getMasterClientAddress(TajoConf conf) {
     return getMasterAddress(conf, HAConstants.MASTER_CLIENT_RPC_ADDRESS);
@@ -52,30 +41,6 @@ public class HAServiceUtil {
     return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf));
   }
 
-  public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) {
-    return getMasterAddress(conf, HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
-  }
-
-  public static String getResourceTrackerName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf));
-  }
-
-  public static InetSocketAddress getCatalogAddress(TajoConf conf) {
-    return getMasterAddress(conf, HAConstants.CATALOG_ADDRESS);
-  }
-
-  public static String getCatalogName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf));
-  }
-
-  public static InetSocketAddress getMasterInfoAddress(TajoConf conf) {
-    return getMasterAddress(conf, HAConstants.MASTER_INFO_ADDRESS);
-  }
-
-  public static String getMasterInfoName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf));
-  }
-
   public static InetSocketAddress getMasterAddress(TajoConf conf, int type) {
     InetSocketAddress masterAddress = null;
 
@@ -153,10 +118,6 @@ public class HAServiceUtil {
     return masterAddress;
   }
 
-  public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
-    return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf);
-  }
-
   public static boolean isMasterAlive(String masterName, TajoConf conf) {
     boolean isAlive = true;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
new file mode 100644
index 0000000..bf7fd2c
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public class BaseServiceTracker implements ServiceTracker {
+  private final TajoConf conf;
+  private TajoMasterInfo tajoMasterInfo;
+  private List<TajoMasterInfo> tajoMasterInfos;
+
+  @SuppressWarnings("unused")
+  public BaseServiceTracker(TajoConf conf) {
+    this.conf = conf;
+
+    tajoMasterInfo = new TajoMasterInfo();
+    tajoMasterInfo.setActive(true);
+    tajoMasterInfo.setAvailable(true);
+    tajoMasterInfo.setTajoMasterAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+    tajoMasterInfo.setTajoClientAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
+    tajoMasterInfo.setWorkerResourceTrackerAddr(conf.getSocketAddrVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
+    tajoMasterInfo.setCatalogAddress(conf.getSocketAddrVar(TajoConf.ConfVars.CATALOG_ADDRESS));
+    tajoMasterInfo.setWebServerAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS));
+
+    tajoMasterInfos = TUtil.newList(tajoMasterInfo);
+  }
+
+  @Override
+  public boolean isHighAvailable() {
+    return false;
+  }
+
+  @Override
+  public InetSocketAddress getUmbilicalAddress() {
+    return tajoMasterInfo.getTajoMasterAddress();
+  }
+
+  @Override
+  public InetSocketAddress getClientServiceAddress() {
+    return tajoMasterInfo.getTajoClientAddress();
+  }
+
+  @Override
+  public InetSocketAddress getResourceTrackerAddress() {
+    return tajoMasterInfo.getWorkerResourceTrackerAddr();
+  }
+
+  @Override
+  public InetSocketAddress getCatalogAddress() {
+    return tajoMasterInfo.getCatalogAddress();
+  }
+
+  @Override
+  public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+    return tajoMasterInfo.getWebServerAddress();
+  }
+
+  @Override
+  public void register() throws IOException {
+  }
+
+  @Override
+  public void delete() throws IOException {
+  }
+
+  @Override
+  public boolean isActiveStatus() {
+    return true;
+  }
+
+  @Override
+  public List<TajoMasterInfo> getMasters() throws IOException {
+    return tajoMasterInfos;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
new file mode 100644
index 0000000..c808537
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.hadoop.net.NetUtils;
+
+import javax.net.SocketFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+public abstract class HAServiceTracker implements ServiceTracker {
+
+  static SocketFactory socketFactory = SocketFactory.getDefault();
+
+  public boolean isHighAvailable() {
+    return true;
+  }
+
+  public static boolean checkConnection(InetSocketAddress address) {
+    boolean isAlive = true;
+
+    try {
+      int connectionTimeout = 10;
+
+      Socket socket = socketFactory.createSocket();
+      NetUtils.connect(socket, address, connectionTimeout);
+    } catch (Exception e) {
+      isAlive = false;
+    }
+    return isAlive;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
new file mode 100644
index 0000000..73ff112
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public interface ServiceTracker {
+
+  public abstract boolean isHighAvailable();
+
+  public abstract InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException;
+
+  public abstract InetSocketAddress getClientServiceAddress() throws ServiceTrackerException;
+
+  public abstract InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException;
+
+  public abstract InetSocketAddress getCatalogAddress() throws ServiceTrackerException;
+
+  public abstract InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException;
+
+  /**
+   * Add master name to shared storage.
+   */
+  public void register() throws IOException;
+
+
+  /**
+   * Delete master name to shared storage.
+   *
+   */
+  public void delete() throws IOException;
+
+  /**
+   *
+   * @return True if current master is an active master.
+   */
+  public boolean isActiveStatus();
+
+  /**
+   *
+   * @return return all master list
+   * @throws IOException
+   */
+  public List<TajoMasterInfo> getMasters() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
new file mode 100644
index 0000000..3407c51
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+public class ServiceTrackerException extends RuntimeException {
+
+  public ServiceTrackerException(Throwable t) {
+    super(t);
+  }
+
+  public ServiceTrackerException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
new file mode 100644
index 0000000..5828055
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.ReflectionUtil;
+
+public class ServiceTrackerFactory {
+
+  public static ServiceTracker get(TajoConf conf) {
+    Class<ServiceTracker> trackerClass;
+
+    try {
+      if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+        trackerClass = (Class<ServiceTracker>) conf.getClassVar(TajoConf.ConfVars.HA_SERVICE_TRACKER_CLASS);
+      } else {
+        trackerClass = (Class<ServiceTracker>) conf.getClassVar(TajoConf.ConfVars.DEFAULT_SERVICE_TRACKER_CLASS);
+      }
+      return ReflectionUtil.newInstance(trackerClass, conf);
+
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java b/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
new file mode 100644
index 0000000..481b528
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import java.net.InetSocketAddress;
+
+public class TajoMasterInfo {
+
+  private boolean available;
+  private boolean isActive;
+
+  private InetSocketAddress tajoMasterAddress;
+  private InetSocketAddress tajoClientAddress;
+  private InetSocketAddress workerResourceTrackerAddr;
+  private InetSocketAddress catalogAddress;
+  private InetSocketAddress webServerAddress;
+
+  public InetSocketAddress getTajoMasterAddress() {
+    return tajoMasterAddress;
+  }
+
+  public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
+    this.tajoMasterAddress = tajoMasterAddress;
+  }
+
+  public InetSocketAddress getTajoClientAddress() {
+    return tajoClientAddress;
+  }
+
+  public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
+    this.tajoClientAddress = tajoClientAddress;
+  }
+
+  public InetSocketAddress getWorkerResourceTrackerAddr() {
+    return workerResourceTrackerAddr;
+  }
+
+  public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
+    this.workerResourceTrackerAddr = workerResourceTrackerAddr;
+  }
+
+  public InetSocketAddress getCatalogAddress() {
+    return catalogAddress;
+  }
+
+  public void setCatalogAddress(InetSocketAddress catalogAddress) {
+    this.catalogAddress = catalogAddress;
+  }
+
+  public InetSocketAddress getWebServerAddress() {
+    return webServerAddress;
+  }
+
+  public void setWebServerAddress(InetSocketAddress webServerAddress) {
+    this.webServerAddress = webServerAddress;
+  }
+
+  public boolean isAvailable() {
+    return available;
+  }
+
+  public void setAvailable(boolean available) {
+    this.available = available;
+  }
+
+  public boolean isActive() {
+    return isActive;
+  }
+
+  public void setActive(boolean active) {
+    isActive = active;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index b1b6450..0304e92 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -18,19 +18,23 @@
 
 package org.apache.tajo.benchmark;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.tajo.catalog.CatalogConstants;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.store.MemStore;
+import org.apache.tajo.client.DummyServiceTracker;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.FileUtil;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -43,9 +47,14 @@ public abstract class BenchmarkSet {
 
   public void init(TajoConf conf, String dataDir) throws IOException {
     this.dataDir = dataDir;
-    if (System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname) != null) {
-      tajo = new TajoClientImpl(NetUtils.createSocketAddr(
-          System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname)));
+
+    if (System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname) != null) {
+
+      String addressStr = System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname);
+      InetSocketAddress addr = NetUtils.createSocketAddr(addressStr);
+      ServiceTracker serviceTracker = new DummyServiceTracker(addr);
+      tajo = new TajoClientImpl(conf, serviceTracker, null);
+
     } else {
       conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName());
       tajo = new TajoClientImpl(conf);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
deleted file mode 100644
index 1329223..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * The HAService is responsible for setting active TajoMaster on startup or when the
- * current active is changing (eg due to failure), monitoring the health of TajoMaster.
- *
- */
-public interface HAService {
-
-  /**
-   * Add master name to shared storage.
-   */
-  public void register() throws IOException;
-
-
-  /**
-   * Delete master name to shared storage.
-   *
-   */
-  public void delete() throws IOException;
-
-  /**
-   *
-   * @return True if current master is an active master.
-   */
-  public boolean isActiveStatus();
-
-  /**
-   *
-   * @return return all master list
-   * @throws IOException
-   */
-  public List<TajoMasterInfo> getMasters() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
deleted file mode 100644
index e18a9b2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
- *
- */
-public class HAServiceHDFSImpl implements HAService {
-  private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class);
-
-  private MasterContext context;
-  private TajoConf conf;
-
-  private FileSystem fs;
-
-  private String masterName;
-  private Path rootPath;
-  private Path haPath;
-  private Path activePath;
-  private Path backupPath;
-
-  private boolean isActiveStatus = false;
-
-  //thread which runs periodically to see the last time since a heartbeat is received.
-  private Thread checkerThread;
-  private volatile boolean stopped = false;
-
-  private int monitorInterval;
-
-  private String currentActiveMaster;
-
-  public HAServiceHDFSImpl(MasterContext context) throws IOException {
-    this.context = context;
-    this.conf = context.getConf();
-    initSystemDirectory();
-
-    InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress();
-    this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
-
-    monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
-  }
-
-  private void initSystemDirectory() throws IOException {
-    // Get Tajo root dir
-    this.rootPath = TajoConf.getTajoRootDir(conf);
-
-    // Check Tajo root dir
-    this.fs = rootPath.getFileSystem(conf);
-
-    // Check and create Tajo system HA dir
-    haPath = TajoConf.getSystemHADir(conf);
-    if (!fs.exists(haPath)) {
-      fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
-      LOG.info("System HA dir '" + haPath + "' is created");
-    }
-
-    activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
-    if (!fs.exists(activePath)) {
-      fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
-      LOG.info("System HA Active dir '" + activePath + "' is created");
-    }
-
-    backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
-    if (!fs.exists(backupPath)) {
-      fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
-      LOG.info("System HA Backup dir '" + backupPath + "' is created");
-    }
-  }
-
-  private void startPingChecker() {
-    if (checkerThread == null) {
-      checkerThread = new Thread(new PingChecker());
-      checkerThread.setName("Ping Checker");
-      checkerThread.start();
-    }
-  }
-
-  @Override
-  public void register() throws IOException {
-    FileStatus[] files = fs.listStatus(activePath);
-
-    // Phase 1: If there is not another active master, this try to become active master.
-    if (files.length == 0) {
-      createMasterFile(true);
-      currentActiveMaster = masterName;
-      LOG.info(String.format("This is added to active master (%s)", masterName));
-    } else {
-      // Phase 2: If there is active master information, we need to check its status.
-      Path activePath = files[0].getPath();
-      currentActiveMaster = activePath.getName().replaceAll("_", ":");
-
-      // Phase 3: If current active master is dead, this master should be active master.
-      if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
-        fs.delete(activePath, true);
-        createMasterFile(true);
-        currentActiveMaster = masterName;
-        LOG.info(String.format("This is added to active master (%s)", masterName));
-      } else {
-        // Phase 4: If current active master is alive, this master need to be backup master.
-        createMasterFile(false);
-        LOG.info(String.format("This is added to backup masters (%s)", masterName));
-      }
-    }
-  }
-
-  private void createMasterFile(boolean isActive) throws IOException {
-    String fileName = masterName.replaceAll(":", "_");
-    Path path = null;
-
-    if (isActive) {
-      path = new Path(activePath, fileName);
-    } else {
-      path = new Path(backupPath, fileName);
-    }
-
-    StringBuilder sb = new StringBuilder();
-    InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
-    address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
-    address = getHostAddress(HAConstants.CATALOG_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
-    address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
-
-    FSDataOutputStream out = fs.create(path);
-
-    try {
-      out.writeUTF(sb.toString());
-      out.hflush();
-      out.close();
-    } catch (FileAlreadyExistsException e) {
-      createMasterFile(false);
-    }
-
-    if (isActive) {
-      isActiveStatus = true;
-    } else {
-      isActiveStatus = false;
-    }
-
-    startPingChecker();
-  }
-
-
-  private InetSocketAddress getHostAddress(int type) {
-    InetSocketAddress address = null;
-
-    switch (type) {
-      case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
-        break;
-      case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .TAJO_MASTER_CLIENT_RPC_ADDRESS);
-        break;
-      case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .RESOURCE_TRACKER_RPC_ADDRESS);
-        break;
-      case HAConstants.CATALOG_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .CATALOG_ADDRESS);
-        break;
-      case HAConstants.MASTER_INFO_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-        .TAJO_MASTER_INFO_ADDRESS);
-      default:
-        break;
-    }
-
-    return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
-  }
-
-  @Override
-  public void delete() throws IOException {
-    String fileName = masterName.replaceAll(":", "_");
-
-    Path activeFile = new Path(activePath, fileName);
-    if (fs.exists(activeFile)) {
-      fs.delete(activeFile, true);
-    }
-
-    Path backupFile = new Path(backupPath, fileName);
-    if (fs.exists(backupFile)) {
-      fs.delete(backupFile, true);
-    }
-    if (isActiveStatus) {
-      isActiveStatus = false;
-    }
-    stopped = true;
-  }
-
-  @Override
-  public boolean isActiveStatus() {
-    return isActiveStatus;
-  }
-
-  @Override
-  public List<TajoMasterInfo> getMasters() throws IOException {
-    List<TajoMasterInfo> list = TUtil.newList();
-    Path path = null;
-
-    FileStatus[] files = fs.listStatus(activePath);
-    if (files.length == 1) {
-      path = files[0].getPath();
-      list.add(createTajoMasterInfo(path, true));
-    }
-
-    files = fs.listStatus(backupPath);
-    for (FileStatus status : files) {
-      path = status.getPath();
-      list.add(createTajoMasterInfo(path, false));
-    }
-
-    return list;
-  }
-
-  private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
-    String masterAddress = path.getName().replaceAll("_", ":");
-    boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
-
-    FSDataInputStream stream = fs.open(path);
-    String data = stream.readUTF();
-
-    stream.close();
-
-    String[] addresses = data.split("_");
-    TajoMasterInfo info = new TajoMasterInfo();
-
-    info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
-    info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
-    info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
-    info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
-    info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
-
-    info.setAvailable(isAlive);
-    info.setActive(isActive);
-
-    return info;
-  }
-
-  private class PingChecker implements Runnable {
-    @Override
-    public void run() {
-      while (!stopped && !Thread.currentThread().isInterrupted()) {
-        synchronized (HAServiceHDFSImpl.this) {
-          try {
-            if (!currentActiveMaster.equals(masterName)) {
-              boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
-                  + ", isAlive:" + isAlive);
-              }
-
-              // If active master is dead, this master should be active master instead of
-              // previous active master.
-              if (!isAlive) {
-                FileStatus[] files = fs.listStatus(activePath);
-                if (files.length == 0 || (files.length ==  1
-                  && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
-                  delete();
-                  register();
-                }
-              }
-            }
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        }
-        try {
-          Thread.sleep(monitorInterval);
-        } catch (InterruptedException e) {
-          LOG.info("PingChecker interrupted. - masterName:" + masterName);
-          break;
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
new file mode 100644
index 0000000..1475a5d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.service.HAServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
+import org.apache.tajo.service.TajoMasterInfo;
+import org.apache.tajo.util.TUtil;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
+ *
+ */
+@SuppressWarnings("unused")
+public class HdfsServiceTracker extends HAServiceTracker {
+  private static Log LOG = LogFactory.getLog(HdfsServiceTracker.class);
+
+  private TajoConf conf;
+
+  private FileSystem fs;
+
+  private String masterName;
+  private Path rootPath;
+  private Path haPath;
+  private Path activePath;
+  private Path backupPath;
+
+  private boolean isActiveStatus = false;
+
+  //thread which runs periodically to see the last time since a heartbeat is received.
+  private Thread checkerThread;
+  private volatile boolean stopped = false;
+
+  private int monitorInterval;
+
+  private String currentActiveMaster;
+
+  public HdfsServiceTracker(TajoConf conf) throws IOException {
+    this.conf = conf;
+    initSystemDirectory();
+
+    InetSocketAddress socketAddress = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+    this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
+
+    monitorInterval = conf.getIntVar(ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
+  }
+
+  private void initSystemDirectory() throws IOException {
+    // Get Tajo root dir
+    this.rootPath = TajoConf.getTajoRootDir(conf);
+
+    // Check Tajo root dir
+    this.fs = rootPath.getFileSystem(conf);
+
+    // Check and create Tajo system HA dir
+    haPath = TajoConf.getSystemHADir(conf);
+    if (!fs.exists(haPath)) {
+      fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+      LOG.info("System HA dir '" + haPath + "' is created");
+    }
+
+    activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+    if (!fs.exists(activePath)) {
+      fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+      LOG.info("System HA Active dir '" + activePath + "' is created");
+    }
+
+    backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+    if (!fs.exists(backupPath)) {
+      fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+      LOG.info("System HA Backup dir '" + backupPath + "' is created");
+    }
+  }
+
+  private void startPingChecker() {
+    if (checkerThread == null) {
+      checkerThread = new Thread(new PingChecker());
+      checkerThread.setName("Ping Checker");
+      checkerThread.start();
+    }
+  }
+
+  @Override
+  public void register() throws IOException {
+    FileStatus[] files = fs.listStatus(activePath);
+
+    // Phase 1: If there is not another active master, this try to become active master.
+    if (files.length == 0) {
+      createMasterFile(true);
+      currentActiveMaster = masterName;
+      LOG.info(String.format("This is added to active master (%s)", masterName));
+    } else {
+      // Phase 2: If there is active master information, we need to check its status.
+      Path activePath = files[0].getPath();
+      currentActiveMaster = activePath.getName().replaceAll("_", ":");
+
+      // Phase 3: If current active master is dead, this master should be active master.
+      if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
+        fs.delete(activePath, true);
+        createMasterFile(true);
+        currentActiveMaster = masterName;
+        LOG.info(String.format("This is added to active master (%s)", masterName));
+      } else {
+        // Phase 4: If current active master is alive, this master need to be backup master.
+        createMasterFile(false);
+        LOG.info(String.format("This is added to backup masters (%s)", masterName));
+      }
+    }
+  }
+
+  /**
+   * It will creates the following form string. It includes
+   *
+   * <pre>
+   * {CLIENT_RPC_HOST:PORT}_{RESOURCE_TRACKER_HOST:PORT}_{CATALOG_HOST:PORT}_{MASTER_WEB_HOST:PORT}
+   * </pre>
+   *
+   * @param isActive A boolean flag to indicate if it is for master or not.
+   * @throws IOException
+   */
+  private void createMasterFile(boolean isActive) throws IOException {
+    String fileName = masterName.replaceAll(":", "_");
+    Path path = null;
+
+    if (isActive) {
+      path = new Path(activePath, fileName);
+    } else {
+      path = new Path(backupPath, fileName);
+    }
+
+    StringBuilder sb = new StringBuilder();
+    InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.CATALOG_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
+
+    FSDataOutputStream out = fs.create(path);
+
+    try {
+      out.writeUTF(sb.toString());
+      out.hsync();
+      out.close();
+    } catch (FileAlreadyExistsException e) {
+      createMasterFile(false);
+    }
+
+    if (isActive) {
+      isActiveStatus = true;
+    } else {
+      isActiveStatus = false;
+    }
+
+    startPingChecker();
+  }
+
+
+  private InetSocketAddress getHostAddress(int type) {
+    InetSocketAddress address = null;
+
+    switch (type) {
+      case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
+        address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+        break;
+      case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
+        address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
+        break;
+      case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
+        address = conf.getSocketAddrVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
+        break;
+      case HAConstants.CATALOG_ADDRESS:
+        address = conf.getSocketAddrVar(ConfVars.CATALOG_ADDRESS);
+        break;
+      case HAConstants.MASTER_INFO_ADDRESS:
+        address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
+      default:
+        break;
+    }
+
+    return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
+  }
+
+  @Override
+  public void delete() throws IOException {
+    String fileName = masterName.replaceAll(":", "_");
+
+    Path activeFile = new Path(activePath, fileName);
+    if (fs.exists(activeFile)) {
+      fs.delete(activeFile, true);
+    }
+
+    Path backupFile = new Path(backupPath, fileName);
+    if (fs.exists(backupFile)) {
+      fs.delete(backupFile, true);
+    }
+    if (isActiveStatus) {
+      isActiveStatus = false;
+    }
+    stopped = true;
+  }
+
+  @Override
+  public boolean isActiveStatus() {
+    return isActiveStatus;
+  }
+
+  @Override
+  public List<TajoMasterInfo> getMasters() throws IOException {
+    List<TajoMasterInfo> list = TUtil.newList();
+    Path path = null;
+
+    FileStatus[] files = fs.listStatus(activePath);
+    if (files.length == 1) {
+      path = files[0].getPath();
+      list.add(createTajoMasterInfo(path, true));
+    }
+
+    files = fs.listStatus(backupPath);
+    for (FileStatus status : files) {
+      path = status.getPath();
+      list.add(createTajoMasterInfo(path, false));
+    }
+
+    return list;
+  }
+
+  private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
+    String masterAddress = path.getName().replaceAll("_", ":");
+    boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
+
+    FSDataInputStream stream = fs.open(path);
+    String data = stream.readUTF();
+
+    stream.close();
+
+    String[] addresses = data.split("_");
+    TajoMasterInfo info = new TajoMasterInfo();
+
+    info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
+    info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
+    info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
+    info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
+    info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
+
+    info.setAvailable(isAlive);
+    info.setActive(isActive);
+
+    return info;
+  }
+
+  private class PingChecker implements Runnable {
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        synchronized (HdfsServiceTracker.this) {
+          try {
+            if (!currentActiveMaster.equals(masterName)) {
+              boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
+                  + ", isAlive:" + isAlive);
+              }
+
+              // If active master is dead, this master should be active master instead of
+              // previous active master.
+              if (!isAlive) {
+                FileStatus[] files = fs.listStatus(activePath);
+                if (files.length == 0 || (files.length ==  1
+                  && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
+                  delete();
+                  register();
+                }
+              }
+            }
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+        try {
+          Thread.sleep(monitorInterval);
+        } catch (InterruptedException e) {
+          LOG.info("PingChecker interrupted. - masterName:" + masterName);
+          break;
+        }
+      }
+    }
+  }
+
+  private final static int MASTER_UMBILICAL_RPC_ADDRESS = 0;
+  private final static int MASTER_CLIENT_RPC_ADDRESS = 1;
+  private final static int RESOURCE_TRACKER_RPC_ADDRESS = 2;
+  private final static int CATALOG_ADDRESS = 3;
+  private final static int MASTER_HTTP_INFO = 4;
+
+  private volatile InetSocketAddress umbilicalRpcAddr;
+  private volatile InetSocketAddress clientRpcAddr;
+  private volatile InetSocketAddress resourceTrackerRpcAddr;
+  private volatile InetSocketAddress catalogAddr;
+  private volatile InetSocketAddress masterHttpInfoAddr;
+
+  @Override
+  public InetSocketAddress getUmbilicalAddress() {
+    if (!checkConnection(umbilicalRpcAddr)) {
+      umbilicalRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_UMBILICAL_RPC_ADDRESS));
+    }
+
+    return umbilicalRpcAddr;
+  }
+
+  @Override
+  public InetSocketAddress getClientServiceAddress() {
+    if (!checkConnection(clientRpcAddr)) {
+      clientRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_CLIENT_RPC_ADDRESS));
+    }
+
+    return clientRpcAddr;
+  }
+
+  @Override
+  public InetSocketAddress getResourceTrackerAddress() {
+    if (!checkConnection(resourceTrackerRpcAddr)) {
+      resourceTrackerRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(RESOURCE_TRACKER_RPC_ADDRESS));
+    }
+
+    return resourceTrackerRpcAddr;
+  }
+
+  @Override
+  public InetSocketAddress getCatalogAddress() {
+    if (!checkConnection(catalogAddr)) {
+      catalogAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(CATALOG_ADDRESS));
+    }
+
+    return catalogAddr;
+  }
+
+  @Override
+  public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+    if (!checkConnection(masterHttpInfoAddr)) {
+      masterHttpInfoAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_HTTP_INFO));
+    }
+
+    return masterHttpInfoAddr;
+  }
+
+  /**
+   * Reads a text file stored in HDFS file, and then return all service addresses read from a HDFS file.   *
+   *
+   * @param conf
+   * @return all service addresses
+   * @throws ServiceTrackerException
+   */
+  private static List<String> getAddressElements(TajoConf conf) throws ServiceTrackerException {
+
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activeMasterBaseDir = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+
+      if (!fs.exists(activeMasterBaseDir)) {
+        throw new ServiceTrackerException("No such active master base path: " + activeMasterBaseDir);
+      }
+      if (!fs.isDirectory(activeMasterBaseDir)) {
+        throw new ServiceTrackerException("Active master base path must be a directory.");
+      }
+
+      FileStatus[] files = fs.listStatus(activeMasterBaseDir);
+
+      if (files.length < 1) {
+        throw new ServiceTrackerException("No active master entry");
+      } else if (files.length > 1) {
+        throw new ServiceTrackerException("Two or more than active master entries.");
+      }
+
+      // We can ensure that there is only one file due to the above assertion.
+      Path activeMasterEntry = files[0].getPath();
+
+      if (!fs.isFile(activeMasterEntry)) {
+        throw new ServiceTrackerException("Active master entry must be a file, but it is a directory.");
+      }
+
+      List<String> addressElements = TUtil.newList();
+
+      addressElements.add(activeMasterEntry.getName().replaceAll("_", ":")); // Add UMBILICAL_RPC_ADDRESS to elements
+
+      FSDataInputStream stream = fs.open(activeMasterEntry);
+      String data = stream.readUTF();
+      stream.close();
+
+      addressElements.addAll(TUtil.newList(data.split("_"))); // Add remains entries to elements
+
+      // ensure the number of entries
+      Preconditions.checkState(addressElements.size() == 5, "Fewer service addresses than necessary.");
+
+      return addressElements;
+
+    } catch (Throwable t) {
+      throw new ServiceTrackerException(t);
+    }
+  }
+
+
+  public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
+    return isMasterAlive(org.apache.tajo.util.NetUtils.normalizeInetSocketAddress(masterAddress), conf);
+  }
+
+  public static boolean isMasterAlive(String masterName, TajoConf conf) {
+    boolean isAlive = true;
+
+    try {
+      // how to create sockets
+      SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf);
+
+      int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
+          CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
+
+      InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName);
+
+      // connected socket
+      Socket socket = socketFactory.createSocket();
+      org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout);
+    } catch (Exception e) {
+      isAlive = false;
+    }
+    return isAlive;
+  }
+
+  public static int getState(String masterName, TajoConf conf) {
+    String targetMaster = masterName.replaceAll(":", "_");
+    int retValue = -1;
+
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+
+      Path temPath = null;
+
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        if (temPath.getName().equals(targetMaster)) {
+          return 0;
+        }
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        if (temPath.getName().equals(targetMaster)) {
+          return 1;
+        }
+      }
+      retValue = -2;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return retValue;
+  }
+
+  public static int formatHA(TajoConf conf) {
+    int retValue = -1;
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+      Path temPath = null;
+
+      int aliveMasterCount = 0;
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+          aliveMasterCount++;
+        }
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+          aliveMasterCount++;
+        }
+      }
+
+      // If there is any alive master, users can't format storage.
+      if (aliveMasterCount > 0) {
+        return 0;
+      }
+
+      // delete ha path.
+      fs.delete(TajoConf.getSystemHADir(conf), true);
+      retValue = 1;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return retValue;
+  }
+
+
+  public static List<String> getMasters(TajoConf conf) {
+    List<String> list = new ArrayList<String>();
+
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+      Path temPath = null;
+
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        list.add(temPath.getName().replaceAll("_", ":"));
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        list.add(temPath.getName().replaceAll("_", ":"));
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return list;
+  }
+
+  private static FileSystem getFileSystem(TajoConf conf) throws IOException {
+    Path rootPath = TajoConf.getTajoRootDir(conf);
+    return rootPath.getFileSystem(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
deleted file mode 100644
index c6fdd40..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import java.net.InetSocketAddress;
-
-public class TajoMasterInfo {
-
-  private boolean available;
-  private boolean isActive;
-
-  private InetSocketAddress tajoMasterAddress;
-  private InetSocketAddress tajoClientAddress;
-  private InetSocketAddress workerResourceTrackerAddr;
-  private InetSocketAddress catalogAddress;
-  private InetSocketAddress webServerAddress;
-
-  public InetSocketAddress getTajoMasterAddress() {
-    return tajoMasterAddress;
-  }
-
-  public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
-    this.tajoMasterAddress = tajoMasterAddress;
-  }
-
-  public InetSocketAddress getTajoClientAddress() {
-    return tajoClientAddress;
-  }
-
-  public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
-    this.tajoClientAddress = tajoClientAddress;
-  }
-
-  public InetSocketAddress getWorkerResourceTrackerAddr() {
-    return workerResourceTrackerAddr;
-  }
-
-  public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
-    this.workerResourceTrackerAddr = workerResourceTrackerAddr;
-  }
-
-  public InetSocketAddress getCatalogAddress() {
-    return catalogAddress;
-  }
-
-  public void setCatalogAddress(InetSocketAddress catalogAddress) {
-    this.catalogAddress = catalogAddress;
-  }
-
-  public InetSocketAddress getWebServerAddress() {
-    return webServerAddress;
-  }
-
-  public void setWebServerAddress(InetSocketAddress webServerAddress) {
-    this.webServerAddress = webServerAddress;
-  }
-
-  public boolean isAvailable() {
-    return available;
-  }
-
-  public void setAvailable(boolean available) {
-    this.available = available;
-  }
-
-  public boolean isActive() {
-    return isActive;
-  }
-
-  public void setActive(boolean active) {
-    isActive = active;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 42ffd87..996d356 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -38,6 +38,8 @@ import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.worker.TajoWorker;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -45,6 +47,7 @@ import java.util.List;
 
 public class TajoContainerProxy extends ContainerProxy {
   private final QueryContext queryContext;
+  private final TajoWorker.WorkerContext workerContext;
   private final String planJson;
 
   public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context,
@@ -52,6 +55,7 @@ public class TajoContainerProxy extends ContainerProxy {
                             QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) {
     super(context, conf, executionBlockId, container);
     this.queryContext = queryContext;
+    this.workerContext = context.getQueryMasterContext().getWorkerContext();
     this.planJson = planJson;
   }
 
@@ -171,27 +175,8 @@ public class TajoContainerProxy extends ContainerProxy {
     RpcConnectionPool connPool = RpcConnectionPool.getPool();
     NettyClientBase tmClient = null;
     try {
-      // In TajoMaster HA mode, if backup master be active status,
-      // worker may fail to connect existing active master. Thus,
-      // if worker can't connect the master, worker should try to connect another master and
-      // update master address in worker context.
-      TajoConf conf = context.getConf();
-      if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              QueryCoordinatorProtocol.class, true);
-        } catch (Exception e) {
-          context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(conf));
-          context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(conf));
-          tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              QueryCoordinatorProtocol.class, true);
-        }
-      } else {
-        tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-            QueryCoordinatorProtocol.class, true);
-      }
+      ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker();
+      tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
 
       QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.releaseWorkerResource(null,

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 786025a..a11606f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -40,18 +40,18 @@ import org.apache.tajo.catalog.LocalCatalogWrapper;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.function.FunctionLoader;
-import org.apache.tajo.ha.HAService;
-import org.apache.tajo.ha.HAServiceHDFSImpl;
-import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
-import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.session.SessionManager;
+import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
+import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rule.EvaluationContext;
 import org.apache.tajo.rule.EvaluationFailedException;
 import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
 import org.apache.tajo.rule.SelfDiagnosisRuleSession;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.session.SessionManager;
 import org.apache.tajo.storage.FileStorageManager;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.*;
@@ -127,7 +127,7 @@ public class TajoMaster extends CompositeService {
 
   private TajoSystemMetrics systemMetrics;
 
-  private HAService haService;
+  private ServiceTracker haService;
 
   private JvmPauseMonitor pauseMonitor;
 
@@ -226,15 +226,6 @@ public class TajoMaster extends CompositeService {
     }
   }
 
-
-  private void initHAManger() throws Exception {
-    // If tajo provides haService based on ZooKeeper, following codes need to update.
-    if (systemConf.getBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE)) {
-      haService = new HAServiceHDFSImpl(context);
-      haService.register();
-    }
-  }
-
   public boolean isActiveMaster() {
     return (haService != null ? haService.isActiveStatus() : true);
   }
@@ -326,11 +317,8 @@ public class TajoMaster extends CompositeService {
 
     initSystemMetrics();
 
-    try {
-      initHAManger();
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
+    haService = ServiceTrackerFactory.get(systemConf);
+    haService.register();
 
     historyWriter = new HistoryWriter(getMasterName(), true);
     historyWriter.init(getConfig());
@@ -477,7 +465,7 @@ public class TajoMaster extends CompositeService {
       return systemMetrics;
     }
 
-    public HAService getHAService() {
+    public ServiceTracker getHAService() {
       return haService;
     }
 


Mime
View raw message