tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [02/13] tajo git commit: TAJO-1221: HA TajoClient should not connect TajoMaster at the first. (jaehwa)
Date Fri, 19 Dec 2014 12:48:50 GMT
TAJO-1221: HA TajoClient should not connect TajoMaster at the first. (jaehwa)

Closes #286


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2548768c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2548768c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2548768c

Branch: refs/heads/index_support
Commit: 2548768cfed8f018727f6f054317b2adb4d7f485
Parents: e025e3c
Author: JaeHwa Jung <blrunner@apache.org>
Authored: Thu Dec 18 11:41:13 2014 +0900
Committer: JaeHwa Jung <blrunner@apache.org>
Committed: Thu Dec 18 11:43:06 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../tajo/catalog/AbstractCatalogClient.java     |   2 +-
 .../org/apache/tajo/cli/tools/TajoAdmin.java    |   2 +-
 .../org/apache/tajo/cli/tools/TajoHAAdmin.java  |   2 +-
 .../apache/tajo/client/SessionConnection.java   |   2 +-
 .../org/apache/tajo/client/TajoClientImpl.java  |   5 +-
 .../apache/tajo/client/TajoHAClientUtil.java    |  20 +-
 .../java/org/apache/tajo/ha/HAConstants.java    |  27 ++
 .../java/org/apache/tajo/ha/HAServiceUtil.java  | 292 ++++++++++++++++++
 .../org/apache/tajo/util/HAServiceUtil.java     | 293 -------------------
 .../apache/tajo/master/TajoContainerProxy.java  |   2 +-
 .../tajo/master/ha/HAServiceHDFSImpl.java       | 100 +++++--
 .../master/querymaster/QueryJobManager.java     |   5 +-
 .../tajo/master/querymaster/QueryMaster.java    |   2 +-
 .../master/querymaster/QueryMasterTask.java     |   2 +-
 .../tajo/worker/TajoResourceAllocator.java      |   2 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |   1 +
 .../tajo/worker/WorkerHeartbeatService.java     |   2 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |   2 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   3 +
 .../tajo/master/ha/TestHAServiceHDFSImpl.java   | 115 +++++---
 21 files changed, 494 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 65492d2..de63285 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,8 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1221: HA TajoClient should not connect TajoMaster at the first. (jaehwa)
+
     TAJO-1241: Change default client and table time zone behavior. (hyunsik)
 
     TAJO-1243: *-site.xml.template should have default configs commented out.

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index dde6980..6b50115 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -33,7 +33,7 @@ import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
-import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.net.InetSocketAddress;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
index 5ef8d76..88b8e0f 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
@@ -28,7 +28,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
 import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
 import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
index 12d94ad..ef8fee9 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
@@ -24,7 +24,7 @@ import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.client.TajoHAClientUtil;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.ha.HAServiceUtil;
 
 import java.io.IOException;
 import java.io.PrintWriter;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 44b772b..db2bd2a 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -33,7 +33,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
-import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index dff8d65..8eafc91 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -33,6 +33,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.jdbc.TajoResultSet;
@@ -56,11 +57,11 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
   CatalogAdminClient catalogClient;
 
   public TajoClientImpl(TajoConf conf) throws IOException {
-    this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
+    this(conf, TajoHAClientUtil.getRpcClientAddress(conf), null);
   }
 
   public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
-    this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase);
+    this(conf, TajoHAClientUtil.getRpcClientAddress(conf), baseDatabase);
   }
 
   public TajoClientImpl(InetSocketAddress addr) throws IOException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
index b95fb35..12a9ec8 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
@@ -39,12 +39,13 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.tajo.cli.tsql.TajoCli.TajoCliContext;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.ha.HAServiceUtil;
+import org.apache.tajo.util.NetUtils;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 
 public class TajoHAClientUtil {
-
   /**
    * In TajoMaster HA mode, if TajoCli can't connect existing active master,
    * this should try to connect new active master.
@@ -65,11 +66,11 @@ public class TajoHAClientUtil {
 
     if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
       if (!HAServiceUtil.isMasterAlive(conf.getVar(
-          TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) {
+        TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) {
         TajoClient tajoClient = null;
         String baseDatabase = client.getBaseDatabase();
         conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
-            HAServiceUtil.getMasterClientName(conf));
+          HAServiceUtil.getMasterClientName(conf));
         client.close();
         tajoClient = new TajoClientImpl(conf, baseDatabase);
 
@@ -84,4 +85,15 @@ public class TajoHAClientUtil {
       return client;
     }
   }
+
+
+  public static InetSocketAddress getRpcClientAddress(TajoConf conf) {
+    if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      return NetUtils.createSocketAddr(HAServiceUtil.getMasterClientName(conf));
+    } else {
+      return NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+        .TAJO_MASTER_CLIENT_RPC_ADDRESS));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java
new file mode 100644
index 0000000..c5f4b8a
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+public class HAConstants {
+  public final static int MASTER_UMBILICAL_RPC_ADDRESS = 1;
+  public final static int MASTER_CLIENT_RPC_ADDRESS = 2;
+  public final static int RESOURCE_TRACKER_RPC_ADDRESS = 3;
+  public final static int CATALOG_ADDRESS = 4;
+  public final static int MASTER_INFO_ADDRESS = 5;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
new file mode 100644
index 0000000..b62d73b
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.NetUtils;
+
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HAServiceUtil {
+  private static Log LOG = LogFactory.getLog(HAServiceUtil.class);
+
+  public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) {
+    return getMasterAddress(conf, HAConstants.MASTER_UMBILICAL_RPC_ADDRESS);
+  }
+
+  public static String getMasterUmbilicalName(TajoConf conf) {
+    return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf));
+  }
+
+  public static InetSocketAddress getMasterClientAddress(TajoConf conf) {
+    return getMasterAddress(conf, HAConstants.MASTER_CLIENT_RPC_ADDRESS);
+  }
+
+  public static String getMasterClientName(TajoConf conf) {
+    return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf));
+  }
+
+  public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) {
+    return getMasterAddress(conf, HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
+  }
+
+  public static String getResourceTrackerName(TajoConf conf) {
+    return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf));
+  }
+
+  public static InetSocketAddress getCatalogAddress(TajoConf conf) {
+    return getMasterAddress(conf, HAConstants.CATALOG_ADDRESS);
+  }
+
+  public static String getCatalogName(TajoConf conf) {
+    return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf));
+  }
+
+  public static InetSocketAddress getMasterInfoAddress(TajoConf conf) {
+    return getMasterAddress(conf, HAConstants.MASTER_INFO_ADDRESS);
+  }
+
+  public static String getMasterInfoName(TajoConf conf) {
+    return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf));
+  }
+
+  public static InetSocketAddress getMasterAddress(TajoConf conf, int type) {
+    InetSocketAddress masterAddress = null;
+
+    if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      try {
+        FileSystem fs = getFileSystem(conf);
+        Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+
+        if (fs.exists(activePath)) {
+          FileStatus[] files = fs.listStatus(activePath);
+
+          if (files.length == 1) {
+            Path file = files[0].getPath();
+            String hostAddress = file.getName().replaceAll("_", ":");
+            FSDataInputStream stream = fs.open(file);
+            String data = stream.readUTF();
+            stream.close();
+
+            String[] addresses = data.split("_");
+
+            switch (type) {
+              case 1:
+                masterAddress = NetUtils.createSocketAddr(hostAddress);
+                break;
+              case 2:
+                masterAddress = NetUtils.createSocketAddr(addresses[0]);
+                break;
+              case 3:
+                masterAddress = NetUtils.createSocketAddr(addresses[1]);
+                break;
+              case 4:
+                masterAddress = NetUtils.createSocketAddr(addresses[2]);
+                break;
+              case 5:
+                masterAddress = NetUtils.createSocketAddr(addresses[3]);
+                break;
+              default:
+                break;
+            }
+          }
+        }
+
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+
+    if (masterAddress == null) {
+      switch (type) {
+        case 1:
+          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+              .TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+          break;
+        case 2:
+          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+              .TAJO_MASTER_CLIENT_RPC_ADDRESS));
+          break;
+        case 3:
+          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+              .RESOURCE_TRACKER_RPC_ADDRESS));
+          break;
+        case 4:
+          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+              .CATALOG_ADDRESS));
+          break;
+        case 5:
+          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+              .TAJO_MASTER_INFO_ADDRESS));
+          break;
+        default:
+          break;
+      }
+    }
+
+    return masterAddress;
+  }
+
+  public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
+    return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf);
+  }
+
+  public static boolean isMasterAlive(String masterName, TajoConf conf) {
+    boolean isAlive = true;
+
+    try {
+      // how to create sockets
+      SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf);
+
+      int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
+          CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
+
+      InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName);
+
+      // connected socket
+      Socket socket = socketFactory.createSocket();
+      org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout);
+    } catch (Exception e) {
+      isAlive = false;
+    }
+    return isAlive;
+  }
+
+  public static int getState(String masterName, TajoConf conf) {
+    String targetMaster = masterName.replaceAll(":", "_");
+    int retValue = -1;
+
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+
+      Path temPath = null;
+
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        if (temPath.getName().equals(targetMaster)) {
+          return 0;
+        }
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        if (temPath.getName().equals(targetMaster)) {
+          return 1;
+        }
+      }
+      retValue = -2;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return retValue;
+  }
+
+  public static int formatHA(TajoConf conf) {
+    int retValue = -1;
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+      Path temPath = null;
+
+      int aliveMasterCount = 0;
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+          aliveMasterCount++;
+        }
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+          aliveMasterCount++;
+        }
+      }
+
+      // If there is any alive master, users can't format storage.
+      if (aliveMasterCount > 0) {
+        return 0;
+      }
+
+      // delete ha path.
+      fs.delete(TajoConf.getSystemHADir(conf), true);
+      retValue = 1;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return retValue;
+  }
+
+
+  public static List<String> getMasters(TajoConf conf) {
+    List<String> list = new ArrayList<String>();
+
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+      Path temPath = null;
+
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        list.add(temPath.getName().replaceAll("_", ":"));
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        list.add(temPath.getName().replaceAll("_", ":"));
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return list;
+  }
+
+  private static FileSystem getFileSystem(TajoConf conf) throws IOException {
+    Path rootPath = TajoConf.getTajoRootDir(conf);
+    return rootPath.getFileSystem(conf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java
deleted file mode 100644
index 4f03113..0000000
--- a/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.util;
-
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-
-import javax.net.SocketFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-
-public class HAServiceUtil {
-
-  private final static int MASTER_UMBILICAL_RPC_ADDRESS = 1;
-  private final static int MASTER_CLIENT_RPC_ADDRESS = 2;
-  private final static int RESOURCE_TRACKER_RPC_ADDRESS = 3;
-  private final static int CATALOG_ADDRESS = 4;
-  private final static int MASTER_INFO_ADDRESS = 5;
-
-  public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) {
-    return getMasterAddress(conf, MASTER_UMBILICAL_RPC_ADDRESS);
-  }
-
-  public static String getMasterUmbilicalName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf));
-  }
-
-  public static InetSocketAddress getMasterClientAddress(TajoConf conf) {
-    return getMasterAddress(conf, MASTER_CLIENT_RPC_ADDRESS);
-  }
-
-  public static String getMasterClientName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf));
-  }
-
-  public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) {
-    return getMasterAddress(conf, RESOURCE_TRACKER_RPC_ADDRESS);
-  }
-
-  public static String getResourceTrackerName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf));
-  }
-
-  public static InetSocketAddress getCatalogAddress(TajoConf conf) {
-    return getMasterAddress(conf, CATALOG_ADDRESS);
-  }
-
-  public static String getCatalogName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf));
-  }
-
-  public static InetSocketAddress getMasterInfoAddress(TajoConf conf) {
-    return getMasterAddress(conf, MASTER_INFO_ADDRESS);
-  }
-
-  public static String getMasterInfoName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf));
-  }
-
-  public static InetSocketAddress getMasterAddress(TajoConf conf, int type) {
-    InetSocketAddress masterAddress = null;
-
-    if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-      try {
-        FileSystem fs = getFileSystem(conf);
-        Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
-
-        if (fs.exists(activePath)) {
-          FileStatus[] files = fs.listStatus(activePath);
-
-          if (files.length == 1) {
-            Path file = files[0].getPath();
-            String hostAddress = file.getName().replaceAll("_", ":");
-            FSDataInputStream stream = fs.open(file);
-            String data = stream.readUTF();
-            stream.close();
-
-            String[] addresses = data.split("_");
-
-            switch (type) {
-              case 1:
-                masterAddress = NetUtils.createSocketAddr(hostAddress);
-                break;
-              case 2:
-                masterAddress = NetUtils.createSocketAddr(addresses[0]);
-                break;
-              case 3:
-                masterAddress = NetUtils.createSocketAddr(addresses[1]);
-                break;
-              case 4:
-                masterAddress = NetUtils.createSocketAddr(addresses[2]);
-                break;
-              case 5:
-                masterAddress = NetUtils.createSocketAddr(addresses[3]);
-                break;
-              default:
-                break;
-            }
-          }
-        }
-
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }
-
-    if (masterAddress == null) {
-      switch (type) {
-        case 1:
-          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
-              .TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
-          break;
-        case 2:
-          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
-              .TAJO_MASTER_CLIENT_RPC_ADDRESS));
-          break;
-        case 3:
-          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
-              .RESOURCE_TRACKER_RPC_ADDRESS));
-          break;
-        case 4:
-          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
-              .CATALOG_ADDRESS));
-          break;
-        case 5:
-          masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
-              .TAJO_MASTER_INFO_ADDRESS));
-          break;
-        default:
-          break;
-      }
-    }
-
-    return masterAddress;
-  }
-
-  public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
-    return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf);
-  }
-
-  public static boolean isMasterAlive(String masterName, TajoConf conf) {
-    boolean isAlive = true;
-
-    try {
-      // how to create sockets
-      SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf);
-
-      int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
-          CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
-
-      InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName);
-
-      // connected socket
-      Socket socket = socketFactory.createSocket();
-      org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout);
-    } catch (Exception e) {
-      isAlive = false;
-    }
-    return isAlive;
-  }
-
-  public static int getState(String masterName, TajoConf conf) {
-    String targetMaster = masterName.replaceAll(":", "_");
-    int retValue = -1;
-
-    try {
-      FileSystem fs = getFileSystem(conf);
-      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
-      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
-
-      Path temPath = null;
-
-      // Check backup masters
-      FileStatus[] files = fs.listStatus(backupPath);
-      for (FileStatus status : files) {
-        temPath = status.getPath();
-        if (temPath.getName().equals(targetMaster)) {
-          return 0;
-        }
-      }
-
-      // Check active master
-      files = fs.listStatus(activePath);
-      if (files.length == 1) {
-        temPath = files[0].getPath();
-        if (temPath.getName().equals(targetMaster)) {
-          return 1;
-        }
-      }
-      retValue = -2;
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-    return retValue;
-  }
-
-  public static int formatHA(TajoConf conf) {
-    int retValue = -1;
-    try {
-      FileSystem fs = getFileSystem(conf);
-      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
-      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
-      Path temPath = null;
-
-      int aliveMasterCount = 0;
-      // Check backup masters
-      FileStatus[] files = fs.listStatus(backupPath);
-      for (FileStatus status : files) {
-        temPath = status.getPath();
-        if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
-          aliveMasterCount++;
-        }
-      }
-
-      // Check active master
-      files = fs.listStatus(activePath);
-      if (files.length == 1) {
-        temPath = files[0].getPath();
-        if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
-          aliveMasterCount++;
-        }
-      }
-
-      // If there is any alive master, users can't format storage.
-      if (aliveMasterCount > 0) {
-        return 0;
-      }
-
-      // delete ha path.
-      fs.delete(TajoConf.getSystemHADir(conf), true);
-      retValue = 1;
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-    return retValue;
-  }
-
-
-  public static List<String> getMasters(TajoConf conf) {
-    List<String> list = new ArrayList<String>();
-
-    try {
-      FileSystem fs = getFileSystem(conf);
-      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
-      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
-      Path temPath = null;
-
-      // Check backup masters
-      FileStatus[] files = fs.listStatus(backupPath);
-      for (FileStatus status : files) {
-        temPath = status.getPath();
-        list.add(temPath.getName().replaceAll("_", ":"));
-      }
-
-      // Check active master
-      files = fs.listStatus(activePath);
-      if (files.length == 1) {
-        temPath = files[0].getPath();
-        list.add(temPath.getName().replaceAll("_", ":"));
-      }
-
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-    return list;
-  }
-
-  private static FileSystem getFileSystem(TajoConf conf) throws IOException {
-    Path rootPath = TajoConf.getTajoRootDir(conf);
-    return rootPath.getFileSystem(conf);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 158316e..0d2acf7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -36,7 +36,7 @@ import org.apache.tajo.master.rm.TajoWorkerContainerId;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.ha.HAServiceUtil;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java
index 26bc97b..45219b3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java
@@ -25,12 +25,14 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAConstants;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.List;
 
 /**
@@ -65,7 +67,10 @@ public class HAServiceHDFSImpl implements HAService {
     this.context = context;
     this.conf = context.getConf();
     initSystemDirectory();
-    this.masterName = conf.get(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS.varname);
+
+    InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress();
+    this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
+
     monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
   }
 
@@ -133,7 +138,6 @@ public class HAServiceHDFSImpl implements HAService {
   }
 
   private void createMasterFile(boolean isActive) throws IOException {
-    String hostName = masterName.split(":")[0];
     String fileName = masterName.replaceAll(":", "_");
     Path path = null;
 
@@ -144,20 +148,27 @@ public class HAServiceHDFSImpl implements HAService {
     }
 
     StringBuilder sb = new StringBuilder();
-    sb.append(context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname,
-        hostName + ":26002"));
-    sb.append("_");
-    sb.append(context.getConf().get(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS.varname,
-        hostName + ":26003"));
-    sb.append("_");
-    sb.append(context.getConf().get(TajoConf.ConfVars.CATALOG_ADDRESS.varname, hostName + ":26005"));
-    sb.append("_");
-    sb.append(context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS.varname,
-        hostName + ":26080"));
+    InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.CATALOG_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
 
     FSDataOutputStream out = fs.create(path);
-    out.writeUTF(sb.toString());
-    out.close();
+
+    try {
+      out.writeUTF(sb.toString());
+      out.hflush();
+      out.close();
+    } catch (FileAlreadyExistsException e) {
+      createMasterFile(false);
+    }
 
     if (isActive) {
       isActiveStatus = true;
@@ -169,6 +180,36 @@ public class HAServiceHDFSImpl implements HAService {
   }
 
 
+  private InetSocketAddress getHostAddress(int type) {
+    InetSocketAddress address = null;
+
+    switch (type) {
+      case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
+        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+          .TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+        break;
+      case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
+        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+          .TAJO_MASTER_CLIENT_RPC_ADDRESS);
+        break;
+      case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
+        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+          .RESOURCE_TRACKER_RPC_ADDRESS);
+        break;
+      case HAConstants.CATALOG_ADDRESS:
+        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+          .CATALOG_ADDRESS);
+        break;
+      case HAConstants.MASTER_INFO_ADDRESS:
+        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+        .TAJO_MASTER_INFO_ADDRESS);
+      default:
+        break;
+    }
+
+    return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
+  }
+
   @Override
   public void delete() throws IOException {
     String fileName = masterName.replaceAll(":", "_");
@@ -196,9 +237,6 @@ public class HAServiceHDFSImpl implements HAService {
   @Override
   public List<TajoMasterInfo> getMasters() throws IOException {
     List<TajoMasterInfo> list = TUtil.newList();
-    boolean isAlive = false;
-    TajoMasterInfo info = null;
-    String hostAddress = null;
     Path path = null;
 
     FileStatus[] files = fs.listStatus(activePath);
@@ -246,20 +284,22 @@ public class HAServiceHDFSImpl implements HAService {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
         synchronized (HAServiceHDFSImpl.this) {
           try {
-            boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
+            if (!currentActiveMaster.equals(masterName)) {
+              boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
                   + ", isAlive:" + isAlive);
-            }
+              }
 
-            // If active master is dead, this master should be active master instead of
-            // previous active master.
-            if (!isAlive) {
-              FileStatus[] files = fs.listStatus(activePath);
-              if (files.length == 0 || (files.length ==  1
-                && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
-                delete();
-                register();
+              // If active master is dead, this master should be active master instead of
+              // previous active master.
+              if (!isAlive) {
+                FileStatus[] files = fs.listStatus(activePath);
+                if (files.length == 0 || (files.length ==  1
+                  && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
+                  delete();
+                  register();
+                }
               }
             }
           } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 536f6ac..ddbd3e1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.master.querymaster;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -52,9 +53,9 @@ public class QueryJobManager extends CompositeService {
 
   private SimpleFifoScheduler scheduler;
 
-  private final Map<QueryId, QueryInProgress> submittedQueries = new HashMap<QueryId, QueryInProgress>();
+  private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
 
-  private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
+  private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
 
   private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
   private AtomicLong maxExecutionTime = new AtomicLong();

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 42fac3a..7ddd787 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -32,6 +32,7 @@ import org.apache.tajo.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TajoAsyncDispatcher;
@@ -41,7 +42,6 @@ import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.worker.TajoWorker;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 1eaef0f..8f63416 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -64,7 +64,7 @@ import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageProperty;
 import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.util.metrics.TajoMetrics;
 import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
 import org.apache.tajo.worker.AbstractResourceAllocator;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 9345885..f055733 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -47,7 +47,7 @@ import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.ha.HAServiceUtil;
 
 import java.net.InetSocketAddress;
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 0f0c1f9..4d96529 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -35,6 +35,7 @@ import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.ha.TajoMasterInfo;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index f12e83c..c809921 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -35,7 +35,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.DiskDeviceInfo;
 import org.apache.tajo.storage.DiskMountInfo;
 import org.apache.tajo.storage.DiskUtil;
-import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.ha.HAServiceUtil;
 
 import java.io.File;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 328a31b..6eb710a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
@@ -31,7 +32,6 @@ import org.apache.tajo.rule.SelfDiagnosisRuleDefinition;
 import org.apache.tajo.rule.SelfDiagnosisRuleVisibility;
 import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
 import org.apache.tajo.rule.SelfDiagnosisRule;
-import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 64c27e0..9868297 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -157,7 +157,10 @@ public class TajoTestingCluster {
     if (!StringUtils.isEmpty(LOG_LEVEL)) {
       Level defaultLevel = Logger.getRootLogger().getLevel();
       Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
+      Logger.getLogger("org.apache.tajo.master.TajoAsyncDispatcher").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(),
+        defaultLevel));
       Logger.getLogger("org.apache.hadoop").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
+      Logger.getLogger("org.apache.zookeeper").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
       Logger.getLogger("BlockStateChange").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
index 69ed556..e1806e1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
@@ -26,12 +26,16 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.master.TajoMaster;
 import org.junit.Test;
 
+import java.util.List;
+
 import static junit.framework.TestCase.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -41,86 +45,90 @@ public class TestHAServiceHDFSImpl  {
   private static Log LOG = LogFactory.getLog(TestHAServiceHDFSImpl.class);
 
   private TajoTestingCluster cluster;
-  private TajoMaster backupMaster1, backupMaster2;
+  private TajoMaster backupMaster;
 
   private TajoConf conf;
   private TajoClient client;
-  private Path testDir;
 
   private Path haPath, activePath, backupPath;
 
-  private static final String LOCAL_HOST =  "localhost:";
+  private String masterAddress;
 
   @Test
-  public final void testTwoBackupMasters() throws Exception {
+  public final void testAutoFailOver() throws Exception {
     cluster = new TajoTestingCluster(true);
-    cluster.startMiniCluster(1);
 
+    cluster.startMiniCluster(1);
     conf = cluster.getConfiguration();
     client = new TajoClientImpl(conf);
+
     try {
       FileSystem fs = cluster.getDefaultFileSystem();
-      startBackupMasters();
 
-      verifyMasterAddress();
-      verifySystemDirectories(fs);
+      masterAddress = HAServiceUtil.getMasterUmbilicalName(conf).split(":")[0];
+
+      setConfiguration();
 
-      Path backupMasterFile1 = new Path(backupPath, backupMaster1.getMasterName()
-          .replaceAll(":", "_"));
-      assertTrue(fs.exists(backupMasterFile1));
+      backupMaster = new TajoMaster();
+      backupMaster.init(conf);
+      backupMaster.start();
 
-      Path backupMasterFile2 = new Path(backupPath, backupMaster2.getMasterName()
-          .replaceAll(":", "_"));
-      assertTrue(fs.exists(backupMasterFile2));
+      assertNotEquals(cluster.getMaster().getMasterName(), backupMaster.getMasterName());
+
+      verifySystemDirectories(fs);
+
+      Path backupMasterFile = new Path(backupPath, backupMaster.getMasterName()
+        .replaceAll(":", "_"));
+      assertTrue(fs.exists(backupMasterFile));
 
       assertTrue(cluster.getMaster().isActiveMaster());
-      assertFalse(backupMaster1.isActiveMaster());
-      assertFalse(backupMaster2.isActiveMaster());
+      assertFalse(backupMaster.isActiveMaster());
+
+      createDatabaseAndTable();
+      verifyDataBaseAndTable();
+      client.close();
+
+      cluster.getMaster().stop();
+
+      Thread.sleep(7000);
+
+      assertFalse(cluster.getMaster().isActiveMaster());
+      assertTrue(backupMaster.isActiveMaster());
+
+      client = new TajoClientImpl(conf);
+      verifyDataBaseAndTable();
     } finally {
-      IOUtils.cleanup(LOG, client, backupMaster1, backupMaster2);
+      client.close();
+      backupMaster.stop();
       cluster.shutdownMiniCluster();
     }
   }
 
-  private void startBackupMasters() throws Exception {
-
+  private void setConfiguration() {
     conf = cluster.getConfiguration();
-    conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
-        LOCAL_HOST + NetUtils.getFreeSocketPort());
-    conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
-        LOCAL_HOST + NetUtils.getFreeSocketPort());
-    conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS,
-        LOCAL_HOST + NetUtils.getFreeSocketPort());
-    conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS,
-        LOCAL_HOST + NetUtils.getFreeSocketPort());
-    conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true);
-
-    backupMaster1 = new TajoMaster();
-    backupMaster1.init(conf);
-    backupMaster1.start();
 
     conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
-        LOCAL_HOST + NetUtils.getFreeSocketPort());
+      masterAddress + ":" + NetUtils.getFreeSocketPort());
     conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
-        LOCAL_HOST + NetUtils.getFreeSocketPort());
+      masterAddress + ":" + NetUtils.getFreeSocketPort());
     conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS,
-        LOCAL_HOST + NetUtils.getFreeSocketPort());
+      masterAddress + ":" + NetUtils.getFreeSocketPort());
     conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS,
-        LOCAL_HOST + NetUtils.getFreeSocketPort());
+      masterAddress + ":" + NetUtils.getFreeSocketPort());
+    conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS,
+      masterAddress + ":" + NetUtils.getFreeSocketPort());
     conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true);
 
-    backupMaster2 = new TajoMaster();
-    backupMaster2.init(conf);
-    backupMaster2.start();
-  }
+    //Client API service RPC Server
+    conf.setIntVar(TajoConf.ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
 
-  private void verifyMasterAddress() {
-    assertNotEquals(cluster.getMaster().getMasterName(),
-        backupMaster1.getMasterName());
-    assertNotEquals(cluster.getMaster().getMasterName(),
-        backupMaster2.getMasterName());
-    assertNotEquals(backupMaster1.getMasterName(),
-        backupMaster2.getMasterName());
+    // Internal RPC Server
+    conf.setIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2);
+    conf.setIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2);
+    conf.setIntVar(TajoConf.ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2);
   }
 
   private void verifySystemDirectories(FileSystem fs) throws Exception {
@@ -134,6 +142,17 @@ public class TestHAServiceHDFSImpl  {
     assertTrue(fs.exists(backupPath));
 
     assertEquals(1, fs.listStatus(activePath).length);
-    assertEquals(2, fs.listStatus(backupPath).length);
+    assertEquals(1, fs.listStatus(backupPath).length);
+  }
+
+  private void createDatabaseAndTable() throws Exception {
+    client.executeQuery("CREATE TABLE default.table1 (age int);");
+    client.executeQuery("CREATE TABLE default.table2 (age int);");
+  }
+
+  private void verifyDataBaseAndTable() throws Exception {
+    client.existDatabase("default");
+    client.existTable("default.table1");
+    client.existTable("default.table2");
   }
 }


Mime
View raw message